Fabcoin Core  0.16.2
P2P Digital Currency
Host.cpp
Go to the documentation of this file.
1 /*
2  This file is part of cpp-ethereum.
3 
4  cpp-ethereum is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 3 of the License, or
7  (at your option) any later version.
8 
9  cpp-ethereum is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
16 */
23 #include <set>
24 #include <chrono>
25 #include <thread>
26 #include <mutex>
27 #include <memory>
28 #include <boost/algorithm/string.hpp>
29 #include <libdevcore/Common.h>
30 #include <libdevcore/Assertions.h>
31 #include <libdevcore/CommonIO.h>
32 #include <libdevcore/Exceptions.h>
33 #include <libdevcore/FileSystem.h>
34 #include "Session.h"
35 #include "Common.h"
36 #include "Capability.h"
37 #include "UPnP.h"
38 #include "RLPxHandshake.h"
39 #include "Host.h"
40 using namespace std;
41 using namespace dev;
42 using namespace dev::p2p;
43 
45 std::chrono::seconds const c_keepAliveInterval = std::chrono::seconds(30);
46 
48 std::chrono::milliseconds const c_keepAliveTimeOut = std::chrono::milliseconds(1000);
49 
50 HostNodeTableHandler::HostNodeTableHandler(Host& _host): m_host(_host) {}
51 
53 {
54  m_host.onNodeTableEvent(_n, _e);
55 }
56 
58 {
59 }
60 
61 void ReputationManager::noteRude(SessionFace const& _s, std::string const& _sub)
62 {
63  DEV_WRITE_GUARDED(x_nodes)
64  m_nodes[make_pair(_s.id(), _s.info().clientVersion)].subs[_sub].isRude = true;
65 }
66 
67 bool ReputationManager::isRude(SessionFace const& _s, std::string const& _sub) const
68 {
69  DEV_READ_GUARDED(x_nodes)
70  {
71  auto nit = m_nodes.find(make_pair(_s.id(), _s.info().clientVersion));
72  if (nit == m_nodes.end())
73  return false;
74  auto sit = nit->second.subs.find(_sub);
75  bool ret = sit == nit->second.subs.end() ? false : sit->second.isRude;
76  return _sub.empty() ? ret : (ret || isRude(_s));
77  }
78  return false;
79 }
80 
81 void ReputationManager::setData(SessionFace const& _s, std::string const& _sub, bytes const& _data)
82 {
83  DEV_WRITE_GUARDED(x_nodes)
84  m_nodes[make_pair(_s.id(), _s.info().clientVersion)].subs[_sub].data = _data;
85 }
86 
87 bytes ReputationManager::data(SessionFace const& _s, std::string const& _sub) const
88 {
89  DEV_READ_GUARDED(x_nodes)
90  {
91  auto nit = m_nodes.find(make_pair(_s.id(), _s.info().clientVersion));
92  if (nit == m_nodes.end())
93  return bytes();
94  auto sit = nit->second.subs.find(_sub);
95  return sit == nit->second.subs.end() ? bytes() : sit->second.data;
96  }
97  return bytes();
98 }
99 
100 Host::Host(string const& _clientVersion, KeyPair const& _alias, NetworkPreferences const& _n):
101  Worker("p2p", 0),
102  m_clientVersion(_clientVersion),
103  m_netPrefs(_n),
104  m_ifAddresses(Network::getInterfaceAddresses()),
105  m_ioService(2),
106  m_tcp4Acceptor(m_ioService),
107  m_alias(_alias),
109 {
110  clog(NetNote) << "Id:" << id();
111 }
112 
113 Host::Host(string const& _clientVersion, NetworkPreferences const& _n, bytesConstRef _restoreNetwork):
114  Host(_clientVersion, networkAlias(_restoreNetwork), _n)
115 {
116  m_restoreNetwork = _restoreNetwork.toBytes();
117 }
118 
120 {
121  stop();
122 }
123 
125 {
127  startWorking();
128  while (isWorking() && !haveNetwork())
129  this_thread::sleep_for(chrono::milliseconds(10));
130 
131  // network start failed!
132  if (isWorking())
133  return;
134 
135  clog(NetWarn) << "Network start failed!";
136  doneWorking();
137 }
138 
140 {
141  // called to force io_service to kill any remaining tasks it might have -
142  // such tasks may involve socket reads from Capabilities that maintain references
143  // to resources we're about to free.
144 
145  {
146  // Although m_run is set by stop() or start(), it effects m_runTimer so x_runTimer is used instead of a mutex for m_run.
147  Guard l(x_runTimer);
148  // ignore if already stopped/stopping
149  if (!m_run)
150  return;
151 
152  // signal run() to prepare for shutdown and reset m_timer
153  m_run = false;
154  }
155 
156  // wait for m_timer to reset (indicating network scheduler has stopped)
157  while (!!m_timer)
158  this_thread::sleep_for(chrono::milliseconds(50));
159 
160  // stop worker thread
161  if (isWorking())
162  stopWorking();
163 }
164 
166 {
167  // reset ioservice (cancels all timers and allows manually polling network, below)
168  m_ioService.reset();
169 
170  DEV_GUARDED(x_timers)
171  m_timers.clear();
172 
173  // shutdown acceptor
174  m_tcp4Acceptor.cancel();
175  if (m_tcp4Acceptor.is_open())
176  m_tcp4Acceptor.close();
177 
178  // There maybe an incoming connection which started but hasn't finished.
179  // Wait for acceptor to end itself instead of assuming it's complete.
180  // This helps ensure a peer isn't stopped at the same time it's starting
181  // and that socket for pending connection is closed.
182  while (m_accepting)
183  m_ioService.poll();
184 
185  // stop capabilities (eth: stops syncing or block/tx broadcast)
186  for (auto const& h: m_capabilities)
187  h.second->onStopping();
188 
189  // disconnect pending handshake, before peers, as a handshake may create a peer
190  for (unsigned n = 0;; n = 0)
191  {
192  DEV_GUARDED(x_connecting)
193  for (auto const& i: m_connecting)
194  if (auto h = i.lock())
195  {
196  h->cancel();
197  n++;
198  }
199  if (!n)
200  break;
201  m_ioService.poll();
202  }
203 
204  // disconnect peers
205  for (unsigned n = 0;; n = 0)
206  {
207  DEV_RECURSIVE_GUARDED(x_sessions)
208  for (auto i: m_sessions)
209  if (auto p = i.second.lock())
210  if (p->isConnected())
211  {
212  p->disconnect(ClientQuit);
213  n++;
214  }
215  if (!n)
216  break;
217 
218  // poll so that peers send out disconnect packets
219  m_ioService.poll();
220  }
221 
222  // stop network (again; helpful to call before subsequent reset())
223  m_ioService.stop();
224 
225  // reset network (allows reusing ioservice in future)
226  m_ioService.reset();
227 
228  // finally, clear out peers (in case they're lingering)
229  RecursiveGuard l(x_sessions);
230  m_sessions.clear();
231 }
232 
233 void Host::startPeerSession(Public const& _id, RLP const& _rlp, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s)
234 {
235  // session maybe ingress or egress so m_peers and node table entries may not exist
236  shared_ptr<Peer> p;
237  DEV_RECURSIVE_GUARDED(x_sessions)
238  {
239  if (m_peers.count(_id))
240  p = m_peers[_id];
241  else
242  {
243  // peer doesn't exist, try to get port info from node table
244  if (m_nodeTable)
245  if (Node n = m_nodeTable->node(_id))
246  p = make_shared<Peer>(n);
247 
248  if (!p)
249  p = make_shared<Peer>(Node(_id, UnspecifiedNodeIPEndpoint));
250 
251  m_peers[_id] = p;
252  }
253  }
254  if (p->isOffline())
255  p->m_lastConnected = std::chrono::system_clock::now();
256  p->endpoint.address = _s->remoteEndpoint().address();
257 
258  auto protocolVersion = _rlp[0].toInt<unsigned>();
259  auto clientVersion = _rlp[1].toString();
260  auto caps = _rlp[2].toVector<CapDesc>();
261  auto listenPort = _rlp[3].toInt<unsigned short>();
262  auto pub = _rlp[4].toHash<Public>();
263 
264  if (pub != _id)
265  {
266  cdebug << "Wrong ID: " << pub << " vs. " << _id;
267  return;
268  }
269 
270  // clang error (previously: ... << hex << caps ...)
271  // "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments"
272  stringstream capslog;
273 
274  // leave only highset mutually supported capability version
275  caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return !haveCapability(_r) || any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second && haveCapability(_o); }); }), caps.end());
276 
277  for (auto cap: caps)
278  capslog << "(" << cap.first << "," << dec << cap.second << ")";
279 
280  clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort;
281 
282  // create session so disconnects are managed
283  shared_ptr<SessionFace> ps = make_shared<Session>(this, move(_io), _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>(), protocolVersion}));
284  if (protocolVersion < dev::p2p::c_protocolVersion - 1)
285  {
286  ps->disconnect(IncompatibleProtocol);
287  return;
288  }
289  if (caps.empty())
290  {
291  ps->disconnect(UselessPeer);
292  return;
293  }
294 
295  if (m_netPrefs.pin && !m_requiredPeers.count(_id))
296  {
297  cdebug << "Unexpected identity from peer (got" << _id << ", must be one of " << m_requiredPeers << ")";
298  ps->disconnect(UnexpectedIdentity);
299  return;
300  }
301 
302  {
303  RecursiveGuard l(x_sessions);
304  if (m_sessions.count(_id) && !!m_sessions[_id].lock())
305  if (auto s = m_sessions[_id].lock())
306  if(s->isConnected())
307  {
308  // Already connected.
309  clog(NetWarn) << "Session already exists for peer with id" << _id;
310  ps->disconnect(DuplicatePeer);
311  return;
312  }
313 
314  if (!peerSlotsAvailable())
315  {
316  ps->disconnect(TooManyPeers);
317  return;
318  }
319 
320  unsigned offset = (unsigned)UserPacket;
321  uint16_t cnt = 1;
322 
323  // todo: mutex Session::m_capabilities and move for(:caps) out of mutex.
324  for (auto const& i: caps)
325  {
326  auto pcap = m_capabilities[i];
327  if (!pcap)
328  return ps->disconnect(IncompatibleProtocol);
329 
330  if (Session::isFramingAllowedForVersion(protocolVersion))
331  pcap->newPeerCapability(ps, 0, i, cnt++);
332  else
333  {
334  pcap->newPeerCapability(ps, offset, i, 0);
335  offset += pcap->messageCount();
336  }
337  }
338 
339  ps->start();
340  m_sessions[_id] = ps;
341  }
342 
343  clog(NetP2PNote) << "p2p.host.peer.register" << _id;
344 }
345 
347 {
348  if (_e == NodeEntryAdded)
349  {
350  clog(NetP2PNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
351  // only add iff node is in node table
352  if (Node n = m_nodeTable->node(_n))
353  {
354  shared_ptr<Peer> p;
355  DEV_RECURSIVE_GUARDED(x_sessions)
356  {
357  if (m_peers.count(_n))
358  {
359  p = m_peers[_n];
360  p->endpoint = n.endpoint;
361  }
362  else
363  {
364  p = make_shared<Peer>(n);
365  m_peers[_n] = p;
366  clog(NetP2PNote) << "p2p.host.peers.events.peerAdded " << _n << p->endpoint;
367  }
368  }
369  if (peerSlotsAvailable(Egress))
370  connect(p);
371  }
372  }
373  else if (_e == NodeEntryDropped)
374  {
375  clog(NetP2PNote) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
376  RecursiveGuard l(x_sessions);
377  if (m_peers.count(_n) && m_peers[_n]->peerType == PeerType::Optional)
378  m_peers.erase(_n);
379  }
380 }
381 
383 {
384  // set m_tcpPublic := listenIP (if public) > public > upnp > unspecified address.
385 
386  auto ifAddresses = Network::getInterfaceAddresses();
387  auto laddr = m_netPrefs.listenIPAddress.empty() ? bi::address() : bi::address::from_string(m_netPrefs.listenIPAddress);
388  auto lset = !laddr.is_unspecified();
389  auto paddr = m_netPrefs.publicIPAddress.empty() ? bi::address() : bi::address::from_string(m_netPrefs.publicIPAddress);
390  auto pset = !paddr.is_unspecified();
391 
392  bool listenIsPublic = lset && isPublicAddress(laddr);
393  bool publicIsHost = !lset && pset && ifAddresses.count(paddr);
394 
395  bi::tcp::endpoint ep(bi::address(), m_listenPort);
396  if (m_netPrefs.traverseNAT && listenIsPublic)
397  {
398  clog(NetNote) << "Listen address set to Public address:" << laddr << ". UPnP disabled.";
399  ep.address(laddr);
400  }
401  else if (m_netPrefs.traverseNAT && publicIsHost)
402  {
403  clog(NetNote) << "Public address set to Host configured address:" << paddr << ". UPnP disabled.";
404  ep.address(paddr);
405  }
406  else if (m_netPrefs.traverseNAT)
407  {
408  bi::address natIFAddr;
409  ep = Network::traverseNAT(lset && ifAddresses.count(laddr) ? std::set<bi::address>({laddr}) : ifAddresses, m_listenPort, natIFAddr);
410 
411  if (lset && natIFAddr != laddr)
412  // if listen address is set, Host will use it, even if upnp returns different
413  clog(NetWarn) << "Listen address" << laddr << "differs from local address" << natIFAddr << "returned by UPnP!";
414 
415  if (pset && ep.address() != paddr)
416  {
417  // if public address is set, Host will advertise it, even if upnp returns different
418  clog(NetWarn) << "Specified public address" << paddr << "differs from external address" << ep.address() << "returned by UPnP!";
419  ep.address(paddr);
420  }
421  }
422  else if (pset)
423  ep.address(paddr);
424 
425  m_tcpPublic = ep;
426 }
427 
429 {
430  assert(m_listenPort > 0);
431 
432  if (m_run && !m_accepting)
433  {
434  clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
435  m_accepting = true;
436 
437  auto socket = make_shared<RLPXSocket>(m_ioService);
438  m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec)
439  {
440  m_accepting = false;
441  if (ec || !m_run)
442  {
443  socket->close();
444  return;
445  }
446  if (peerCount() > peerSlots(Ingress))
447  {
448  clog(NetConnect) << "Dropping incoming connect due to maximum peer count (" << Ingress << " * ideal peer count): " << socket->remoteEndpoint();
449  socket->close();
450  if (ec.value() < 1)
451  runAcceptor();
452  return;
453  }
454 
455  bool success = false;
456  try
457  {
458  // incoming connection; we don't yet know nodeid
459  auto handshake = make_shared<RLPXHandshake>(this, socket);
460  m_connecting.push_back(handshake);
461  handshake->start();
462  success = true;
463  }
464  catch (Exception const& _e)
465  {
466  clog(NetWarn) << "ERROR: " << diagnostic_information(_e);
467  }
468  catch (std::exception const& _e)
469  {
470  clog(NetWarn) << "ERROR: " << _e.what();
471  }
472 
473  if (!success)
474  socket->ref().close();
475  runAcceptor();
476  });
477  }
478 }
479 
480 std::unordered_map<Public, std::string> Host::pocHosts()
481 {
482  return {
483  // Mainnet:
484  { Public("a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c"), "52.16.188.185:30303" },
485  { Public("de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786"), "54.94.239.50:30303" },
486  { Public("1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082"), "52.74.57.123:30303" },
487  // Testnet:
488  { Public("e4533109cc9bd7604e4ff6c095f7a1d807e15b38e9bfeb05d3b7c423ba86af0a9e89abbf40bd9dde4250fef114cd09270fa4e224cbeef8b7bf05a51e8260d6b8"), "94.242.229.4:40404" },
489  { Public("8c336ee6f03e99613ad21274f269479bf4413fb294d697ef15ab897598afb931f56beb8e97af530aee20ce2bcba5776f4a312bc168545de4d43736992c814592"), "94.242.229.203:30303" },
490  };
491 }
492 
493 void Host::addPeer(NodeSpec const& _s, PeerType _t)
494 {
495  if (_t == PeerType::Optional)
496  addNode(_s.id(), _s.nodeIPEndpoint());
497  else
498  requirePeer(_s.id(), _s.nodeIPEndpoint());
499 }
500 
501 void Host::addNode(NodeID const& _node, NodeIPEndpoint const& _endpoint)
502 {
503  // return if network is stopped while waiting on Host::run() or nodeTable to start
504  while (!haveNetwork())
505  if (isWorking())
506  this_thread::sleep_for(chrono::milliseconds(50));
507  else
508  return;
509 
510  if (_endpoint.tcpPort < 30300 || _endpoint.tcpPort > 30305)
511  clog(NetConnect) << "Non-standard port being recorded: " << _endpoint.tcpPort;
512 
513  if (m_nodeTable)
514  m_nodeTable->addNode(Node(_node, _endpoint));
515 }
516 
517 void Host::requirePeer(NodeID const& _n, NodeIPEndpoint const& _endpoint)
518 {
519  m_requiredPeers.insert(_n);
520 
521  if (!m_run)
522  return;
523 
524  Node node(_n, _endpoint, PeerType::Required);
525  if (_n)
526  {
527  // create or update m_peers entry
528  shared_ptr<Peer> p;
529  DEV_RECURSIVE_GUARDED(x_sessions)
530  if (m_peers.count(_n))
531  {
532  p = m_peers[_n];
533  p->endpoint = node.endpoint;
534  p->peerType = PeerType::Required;
535  }
536  else
537  {
538  p = make_shared<Peer>(node);
539  m_peers[_n] = p;
540  }
541  // required for discovery
542  if (m_nodeTable)
543  m_nodeTable->addNode(*p, NodeTable::NodeRelation::Unknown);
544  }
545  else if (m_nodeTable)
546  {
547  m_nodeTable->addNode(node);
548  auto t = make_shared<boost::asio::deadline_timer>(m_ioService);
549  t->expires_from_now(boost::posix_time::milliseconds(600));
550  t->async_wait([this, _n](boost::system::error_code const& _ec)
551  {
552  if (!_ec)
553  if (m_nodeTable)
554  if (auto n = m_nodeTable->node(_n))
555  requirePeer(n.id, n.endpoint);
556  });
557  DEV_GUARDED(x_timers)
558  m_timers.push_back(t);
559  }
560 }
561 
562 void Host::relinquishPeer(NodeID const& _node)
563 {
564  Guard l(x_requiredPeers);
565  if (m_requiredPeers.count(_node))
566  m_requiredPeers.erase(_node);
567 }
568 
569 void Host::connect(std::shared_ptr<Peer> const& _p)
570 {
571  if (!m_run)
572  return;
573 
574  if (havePeerSession(_p->id))
575  {
576  clog(NetConnect) << "Aborted connect. Node already connected.";
577  return;
578  }
579 
580  if (!!m_nodeTable && !m_nodeTable->haveNode(_p->id) && _p->peerType == PeerType::Optional)
581  return;
582 
583  // prevent concurrently connecting to a node
584  Peer *nptr = _p.get();
585  {
586  Guard l(x_pendingNodeConns);
587  if (m_pendingPeerConns.count(nptr))
588  return;
589  m_pendingPeerConns.insert(nptr);
590  }
591 
592  _p->m_lastAttempted = std::chrono::system_clock::now();
593 
594  bi::tcp::endpoint ep(_p->endpoint);
595  clog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id();
596  auto socket = make_shared<RLPXSocket>(m_ioService);
597  socket->ref().async_connect(ep, [=](boost::system::error_code const& ec)
598  {
599  _p->m_lastAttempted = std::chrono::system_clock::now();
600  _p->m_failedAttempts++;
601 
602  if (ec)
603  {
604  clog(NetConnect) << "Connection refused to node" << _p->id << "@" << ep << "(" << ec.message() << ")";
605  // Manually set error (session not present)
606  _p->m_lastDisconnect = TCPError;
607  }
608  else
609  {
610  clog(NetConnect) << "Connecting to" << _p->id << "@" << ep;
611  auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id);
612  {
613  Guard l(x_connecting);
614  m_connecting.push_back(handshake);
615  }
616 
617  handshake->start();
618  }
619 
620  Guard l(x_pendingNodeConns);
621  m_pendingPeerConns.erase(nptr);
622  });
623 }
624 
626 {
627  if (!m_run)
628  return PeerSessionInfos();
629 
630  std::vector<PeerSessionInfo> ret;
631  RecursiveGuard l(x_sessions);
632  for (auto& i: m_sessions)
633  if (auto j = i.second.lock())
634  if (j->isConnected())
635  ret.push_back(j->info());
636  return ret;
637 }
638 
639 size_t Host::peerCount() const
640 {
641  unsigned retCount = 0;
642  RecursiveGuard l(x_sessions);
643  for (auto& i: m_sessions)
644  if (std::shared_ptr<SessionFace> j = i.second.lock())
645  if (j->isConnected())
646  retCount++;
647  return retCount;
648 }
649 
650 void Host::run(boost::system::error_code const&)
651 {
652  if (!m_run)
653  {
654  // reset NodeTable
655  m_nodeTable.reset();
656 
657  // stopping io service allows running manual network operations for shutdown
658  // and also stops blocking worker thread, allowing worker thread to exit
659  m_ioService.stop();
660 
661  // resetting timer signals network that nothing else can be scheduled to run
662  m_timer.reset();
663  return;
664  }
665 
666  m_nodeTable->processEvents();
667 
668  // cleanup zombies
669  DEV_GUARDED(x_connecting)
670  m_connecting.remove_if([](std::weak_ptr<RLPXHandshake> h){ return h.expired(); });
671  DEV_GUARDED(x_timers)
672  m_timers.remove_if([](std::shared_ptr<boost::asio::deadline_timer> t)
673  {
674  return t->expires_from_now().total_milliseconds() < 0;
675  });
676 
677  keepAlivePeers();
678 
679  // At this time peers will be disconnected based on natural TCP timeout.
680  // disconnectLatePeers needs to be updated for the assumption that Session
681  // is always live and to ensure reputation and fallback timers are properly
682  // updated. // disconnectLatePeers();
683 
684  // todo: update peerSlotsAvailable()
685 
686  list<shared_ptr<Peer>> toConnect;
687  unsigned reqConn = 0;
688  {
689  RecursiveGuard l(x_sessions);
690  for (auto const& p: m_peers)
691  {
692  bool haveSession = havePeerSession(p.second->id);
693  bool required = p.second->peerType == PeerType::Required;
694  if (haveSession && required)
695  reqConn++;
696  else if (!haveSession && p.second->shouldReconnect() && (!m_netPrefs.pin || required))
697  toConnect.push_back(p.second);
698  }
699  }
700 
701  for (auto p: toConnect)
702  if (p->peerType == PeerType::Required && reqConn++ < m_idealPeerCount)
703  connect(p);
704 
705  if (!m_netPrefs.pin)
706  {
707  unsigned pendingCount = 0;
708  DEV_GUARDED(x_pendingNodeConns)
709  pendingCount = m_pendingPeerConns.size();
710  int openSlots = m_idealPeerCount - peerCount() - pendingCount + reqConn;
711  if (openSlots > 0)
712  for (auto p: toConnect)
713  if (p->peerType == PeerType::Optional && openSlots--)
714  connect(p);
715  }
716 
717  auto runcb = [this](boost::system::error_code const& error) { run(error); };
718  m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));
719  m_timer->async_wait(runcb);
720 }
721 
723 {
724  asserts(!m_timer);
725 
726  {
727  // prevent m_run from being set to true at same time as set to false by stop()
728  // don't release mutex until m_timer is set so in case stop() is called at same
729  // time, stop will wait on m_timer and graceful network shutdown.
730  Guard l(x_runTimer);
731  // create deadline timer
732  m_timer.reset(new boost::asio::deadline_timer(m_ioService));
733  m_run = true;
734  }
735 
736  // start capability threads (ready for incoming connections)
737  for (auto const& h: m_capabilities)
738  h.second->onStarting();
739 
740  // try to open acceptor (todo: ipv6)
741  int port = Network::tcp4Listen(m_tcp4Acceptor, m_netPrefs);
742  if (port > 0)
743  {
744  m_listenPort = port;
745  determinePublic();
746  runAcceptor();
747  }
748  else
749  clog(NetP2PNote) << "p2p.start.notice id:" << id() << "TCP Listen port is invalid or unavailable.";
750 
751  auto nodeTable = make_shared<NodeTable>(
752  m_ioService,
753  m_alias,
754  NodeIPEndpoint(bi::address::from_string(listenAddress()), listenPort(), listenPort()),
755  m_netPrefs.discovery
756  );
757  nodeTable->setEventHandler(new HostNodeTableHandler(*this));
758  m_nodeTable = nodeTable;
759  restoreNetwork(&m_restoreNetwork);
760 
761  clog(NetP2PNote) << "p2p.started id:" << id();
762 
763  run(boost::system::error_code());
764 }
765 
767 {
768  try
769  {
770  if (m_run)
771  m_ioService.run();
772  }
773  catch (std::exception const& _e)
774  {
775  clog(NetP2PWarn) << "Exception in Network Thread:" << _e.what();
776  clog(NetP2PWarn) << "Network Restart is Recommended.";
777  }
778 }
779 
781 {
782  if (chrono::steady_clock::now() - c_keepAliveInterval < m_lastPing)
783  return;
784 
785  RecursiveGuard l(x_sessions);
786  for (auto it = m_sessions.begin(); it != m_sessions.end();)
787  if (auto p = it->second.lock())
788  {
789  p->ping();
790  ++it;
791  }
792  else
793  it = m_sessions.erase(it);
794 
795  m_lastPing = chrono::steady_clock::now();
796 }
797 
799 {
800  auto now = chrono::steady_clock::now();
801  if (now - c_keepAliveTimeOut < m_lastPing)
802  return;
803 
804  RecursiveGuard l(x_sessions);
805  for (auto p: m_sessions)
806  if (auto pp = p.second.lock())
807  if (now - c_keepAliveTimeOut > m_lastPing && pp->lastReceived() < m_lastPing)
808  pp->disconnect(PingTimeout);
809 }
810 
812 {
813  std::list<Peer> peers;
814  {
815  RecursiveGuard l(x_sessions);
816  for (auto p: m_peers)
817  if (p.second)
818  peers.push_back(*p.second);
819  }
820  peers.sort();
821 
822  RLPStream network;
823  int count = 0;
824  for (auto const& p: peers)
825  {
826  // todo: ipv6
827  if (!p.endpoint.address.is_v4())
828  continue;
829 
830  // Only save peers which have connected within 2 days, with properly-advertised port and public IP address
831  if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && !!p.endpoint && p.id != id() && (p.peerType == PeerType::Required || p.endpoint.isAllowed()))
832  {
833  network.appendList(11);
834  p.endpoint.streamRLP(network, NodeIPEndpoint::StreamInline);
835  network << p.id << (p.peerType == PeerType::Required ? true : false)
836  << chrono::duration_cast<chrono::seconds>(p.m_lastConnected.time_since_epoch()).count()
837  << chrono::duration_cast<chrono::seconds>(p.m_lastAttempted.time_since_epoch()).count()
838  << p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating;
839  count++;
840  }
841  }
842 
843  if (!!m_nodeTable)
844  {
845  auto state = m_nodeTable->snapshot();
846  state.sort();
847  for (auto const& entry: state)
848  {
849  network.appendList(4);
850  entry.endpoint.streamRLP(network, NodeIPEndpoint::StreamInline);
851  network << entry.id;
852  count++;
853  }
854  }
855  // else: TODO: use previous configuration if available
856 
857  RLPStream ret(3);
858  ret << dev::p2p::c_protocolVersion << m_alias.secret().ref();
859  ret.appendList(count);
860  if (!!count)
861  ret.appendRaw(network.out(), count);
862  return ret.out();
863 }
864 
866 {
867  if (!_b.size())
868  return;
869 
870  // nodes can only be added if network is added
871  if (!isStarted())
872  BOOST_THROW_EXCEPTION(NetworkStartRequired());
873 
874  if (m_dropPeers)
875  return;
876 
877  RecursiveGuard l(x_sessions);
878  RLP r(_b);
879  unsigned fileVersion = r[0].toInt<unsigned>();
880  if (r.itemCount() > 0 && r[0].isInt() && fileVersion >= dev::p2p::c_protocolVersion - 1)
881  {
882  // r[0] = version
883  // r[1] = key
884  // r[2] = nodes
885 
886  for (auto i: r[2])
887  {
888  // todo: ipv6
889  if (i[0].itemCount() != 4 && i[0].size() != 4)
890  continue;
891 
892  if (i.itemCount() == 4 || i.itemCount() == 11)
893  {
894  Node n((NodeID)i[3], NodeIPEndpoint(i));
895  if (i.itemCount() == 4 && n.endpoint.isAllowed())
896  m_nodeTable->addNode(n);
897  else if (i.itemCount() == 11)
898  {
899  n.peerType = i[4].toInt<bool>() ? PeerType::Required : PeerType::Optional;
901  continue;
902  shared_ptr<Peer> p = make_shared<Peer>(n);
903  p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
904  p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[6].toInt<unsigned>()));
905  p->m_failedAttempts = i[7].toInt<unsigned>();
906  p->m_lastDisconnect = (DisconnectReason)i[8].toInt<unsigned>();
907  p->m_score = (int)i[9].toInt<unsigned>();
908  p->m_rating = (int)i[10].toInt<unsigned>();
909  m_peers[p->id] = p;
910  if (p->peerType == PeerType::Required)
911  requirePeer(p->id, n.endpoint);
912  else
913  m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
914  }
915  }
916  else if (i.itemCount() == 3 || i.itemCount() == 10)
917  {
918  Node n((NodeID)i[2], NodeIPEndpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<uint16_t>(), i[1].toInt<uint16_t>()));
919  if (i.itemCount() == 3 && n.endpoint.isAllowed())
920  m_nodeTable->addNode(n);
921  else if (i.itemCount() == 10)
922  {
923  n.peerType = i[3].toInt<bool>() ? PeerType::Required : PeerType::Optional;
925  continue;
926  shared_ptr<Peer> p = make_shared<Peer>(n);
927  p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
928  p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
929  p->m_failedAttempts = i[6].toInt<unsigned>();
930  p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
931  p->m_score = (int)i[8].toInt<unsigned>();
932  p->m_rating = (int)i[9].toInt<unsigned>();
933  m_peers[p->id] = p;
934  if (p->peerType == PeerType::Required)
935  requirePeer(p->id, n.endpoint);
936  else
937  m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
938  }
939  }
940  }
941  }
942 }
943 
945 {
946  RLP r(_b);
947  if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt<unsigned>() >= 3)
948  return KeyPair(Secret(r[1].toBytes()));
949  else
950  return KeyPair::create();
951 }
void determinePublic()
Determines and sets m_tcpPublic to publicly advertised address.
Definition: Host.cpp:382
void restoreNetwork(bytesConstRef _b)
Deserialise the data and populate the set of known peers.
Definition: Host.cpp:865
union node node
Adapted from code found on http://stackoverflow.com/questions/180947/base64-decode-snippet-in-c Origi...
Definition: Arith256.cpp:15
bool error(const char *fmt, const Args &...args)
Definition: util.h:178
std::chrono::milliseconds const c_keepAliveTimeOut
Disconnect timeout after failure to respond to keepAlivePeers ping.
Definition: Host.cpp:48
std::set< T > toSet(int _flags=LaissezFaire) const
Definition: RLP.h:219
bool isAllowed() const
Definition: Common.h:196
h256s subs(bytesConstRef _node)
void setData(SessionFace const &_s, std::string const &_sub, bytes const &_data)
Definition: Host.cpp:81
void addPeer(NodeSpec const &_s, PeerType _t)
Add a potential peer.
Definition: Host.cpp:493
bool isInt() const
Integer value. Must not have a leading zero.
Definition: RLP.cpp:132
std::pair< std::string, u256 > CapDesc
Definition: Common.h:142
size_t itemCount() const
Definition: RLP.h:118
std::vector< T > toVector(int _flags=LaissezFaire) const
Definition: RLP.h:204
static KeyPair networkAlias(bytesConstRef _b)
Get or create host identifier (KeyPair).
Definition: Host.cpp:944
static bool isFramingAllowedForVersion(unsigned _version)
Definition: Session.h:93
void noteRude(SessionFace const &_s, std::string const &_sub=std::string())
Definition: Host.cpp:61
#define asserts(A)
Definition: Assertions.h:41
virtual ~Host()
Will block on network process events.
Definition: Host.cpp:119
#define h(i)
Definition: sha.cpp:736
bytes const & out() const
Read the byte stream.
Definition: RLP.h:433
Simple class that represents a "key pair".
Definition: Common.h:150
size_t count
Definition: ExecStats.cpp:37
void run(boost::system::error_code const &error)
Called by startedWorking. Not thread-safe; to be called only be Worker.
Definition: Host.cpp:650
The Host class Capabilities should be registered prior to startNetwork, since m_capabilities is not t...
Definition: Host.h:129
std::vector< PeerSessionInfo > PeerSessionInfos
Definition: Common.h:164
bool isPublicAddress(bi::address const &_addressToCheck)
Definition: Common.cpp:76
static KeyPair create()
Create a new, randomly generated object.
Definition: Common.cpp:307
SecureFixedHash< 32 > Secret
Definition: Common.h:35
static int tcp4Listen(bi::tcp::acceptor &_acceptor, NetworkPreferences const &_netPrefs)
Try to bind and listen on _listenPort, else attempt net-allocated port.
Definition: Network.cpp:119
void disconnectLatePeers()
Disconnect peers which didn&#39;t respond to keepAlivePeers ping prior to c_keepAliveTimeOut.
Definition: Host.cpp:798
std::hash for asio::adress
Definition: Common.h:323
NodeTableEventType
Definition: NodeTable.h:48
assert(len-trim+(2 *lenIndices)<=WIDTH)
const unsigned c_protocolVersion
Peer network protocol version.
Definition: Common.cpp:28
virtual void doWork()
Run network. Not thread-safe; to be called only by worker.
Definition: Host.cpp:766
bool isRude(SessionFace const &_s, std::string const &_sub=std::string()) const
Definition: Host.cpp:67
IPv4,UDP/TCP endpoints.
Definition: Common.h:175
void stop()
Stop network.
Definition: Host.cpp:139
ExecStats::duration min
Definition: ExecStats.cpp:35
h512 Public
A public key: 64 bytes.
Definition: Common.h:39
size_t peerCount() const
Get number of peers connected.
Definition: Host.cpp:639
DisconnectReason
Definition: Common.h:106
bytes saveNetwork() const
Serialise the set of known peers.
Definition: Host.cpp:811
void connect(std::shared_ptr< Peer > const &_p)
Definition: Host.cpp:569
#define DEV_GUARDED(MUTEX)
Simple block guard.
Definition: Guards.h:144
#define cdebug
Definition: Log.h:302
#define DEV_RECURSIVE_GUARDED(MUTEX)
Definition: Guards.h:150
_N toHash(int _flags=Strict) const
Definition: RLP.h:298
Base class for all exceptions.
Definition: Exceptions.h:39
std::lock_guard< std::mutex > Guard
Definition: Guards.h:41
static std::set< bi::address > getInterfaceAddresses()
Definition: Network.cpp:46
clock::duration duration
Definition: bench.h:50
#define DEV_WRITE_GUARDED(MUTEX)
Definition: Guards.h:148
NodeID id() const
Definition: Common.h:227
PeerSessionInfos peerSessionInfo() const
Get peer information.
Definition: Host.cpp:625
static bi::tcp::endpoint traverseNAT(std::set< bi::address > const &_ifAddresses, unsigned short _listenPort, bi::address &o_upnpInterfaceAddr)
Return public endpoint of upnp interface. If successful o_upnpifaddr will be a private interface addr...
Definition: Network.cpp:176
#define DEV_READ_GUARDED(MUTEX)
Definition: Guards.h:146
std::vector< byte > bytes
Definition: Common.h:75
void requirePeer(NodeID const &_node, NodeIPEndpoint const &_endpoint)
Create Peer and attempt keeping peer connected.
Definition: Host.cpp:517
std::vector< unsigned char > toBytes() const
Definition: vector_ref.h:45
std::chrono::seconds const c_keepAliveInterval
Interval at which Host::run will call keepAlivePeers to ping peers.
Definition: Host.cpp:45
RLPStream & appendList(size_t _items)
Appends a list.
Definition: RLP.cpp:276
void onNodeTableEvent(NodeID const &_n, NodeTableEventType const &_e)
Definition: Host.cpp:346
void addNode(NodeID const &_node, NodeIPEndpoint const &_endpoint)
Add node as a peer candidate. Node is added if discovery ping is successful and table has capacity...
Definition: Host.cpp:501
size_t size() const
Definition: vector_ref.h:55
void start()
Start network. .
Definition: Host.cpp:124
NodeIPEndpoint nodeIPEndpoint() const
Definition: Common.cpp:233
void runAcceptor()
Called only from startedWorking().
Definition: Host.cpp:428
virtual void processEvent(NodeID const &_n, NodeTableEventType const &_e)
Definition: Host.cpp:52
bytes data(SessionFace const &_s, std::string const &_subs) const
Definition: Host.cpp:87
std::string const clientVersion
Definition: Common.h:154
PeerType
Definition: Common.h:166
void relinquishPeer(NodeID const &_node)
Note peer as no longer being required.
Definition: Host.cpp:562
clock::time_point time_point
Definition: bench.h:49
std::string toString(int _flags=LaissezFaire) const
Converts to string.
Definition: RLP.h:199
static std::unordered_map< Public, std::string > pocHosts()
Default hosts for current version of client.
Definition: Host.cpp:480
#define clog(X)
Definition: Log.h:295
std::lock_guard< std::recursive_mutex > RecursiveGuard
Definition: Guards.h:43
PeerType peerType
Definition: Common.h:261
struct evm_uint160be address(struct evm_env *env)
Definition: capi.c:13
Host(std::string const &_clientVersion, NetworkPreferences const &_n=NetworkPreferences(), bytesConstRef _restoreNetwork=bytesConstRef())
Start server, listening for connections on the given port.
Representation of connectivity state and all other pertinent Peer metadata.
Definition: Peer.h:52
virtual PeerSessionInfo info() const =0
HostNodeTableHandler(Host &_host)
Definition: Host.cpp:50
_T toInt(int _flags=Strict) const
Converts to int of type given; if isString(), decodes as big-endian bytestream.
Definition: RLP.h:275
Class for writing to an RLP bytestream.
Definition: RLP.h:383
virtual void startedWorking()
Called by Worker. Not thread-safe; to be called only by worker.
Definition: Host.cpp:722
const NodeIPEndpoint UnspecifiedNodeIPEndpoint
Definition: Common.cpp:32
RLPStream & appendRaw(bytesConstRef _rlp, size_t _itemCount=1)
Appends raw (pre-serialised) RLP data. Use with caution.
Definition: RLP.cpp:230
virtual void doneWorking()
Shutdown network. Not thread-safe; to be called only by worker.
Definition: Host.cpp:165
#define DEV_TIMED_FUNCTION_ABOVE(MS)
Definition: Common.h:300
uint8_t const * data
Definition: sha3.h:19
Class for interpreting Recursive Linear-Prefix Data.
Definition: RLP.h:64
void keepAlivePeers()
Ping the peers to update the latency information and disconnect peers which have timed out...
Definition: Host.cpp:780
virtual NodeID id() const =0
UniValue stop(const JSONRPCRequest &jsonRequest)
Definition: server.cpp:237
NodeIPEndpoint endpoint
Endpoints by which we expect to reach node.
Definition: Common.h:258
void startPeerSession(Public const &_id, RLP const &_hello, std::unique_ptr< RLPXFrameCoder > &&_io, std::shared_ptr< RLPXSocket > const &_s)
Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error...
Definition: Host.cpp:233