28 #include <boost/algorithm/string.hpp> 50 HostNodeTableHandler::HostNodeTableHandler(
Host& _host): m_host(_host) {}
72 if (nit == m_nodes.end())
74 auto sit = nit->second.subs.find(_sub);
75 bool ret = sit == nit->second.subs.end() ?
false : sit->second.isRude;
76 return _sub.empty() ? ret : (ret || isRude(_s));
92 if (nit == m_nodes.end())
94 auto sit = nit->second.subs.find(_sub);
95 return sit == nit->second.subs.end() ?
bytes() : sit->second.data;
102 m_clientVersion(_clientVersion),
106 m_tcp4Acceptor(m_ioService),
114 Host(_clientVersion, networkAlias(_restoreNetwork), _n)
116 m_restoreNetwork = _restoreNetwork.
toBytes();
128 while (isWorking() && !haveNetwork())
129 this_thread::sleep_for(chrono::milliseconds(10));
158 this_thread::sleep_for(chrono::milliseconds(50));
174 m_tcp4Acceptor.cancel();
175 if (m_tcp4Acceptor.is_open())
176 m_tcp4Acceptor.close();
186 for (
auto const&
h: m_capabilities)
187 h.second->onStopping();
190 for (
unsigned n = 0;; n = 0)
193 for (
auto const& i: m_connecting)
194 if (
auto h = i.lock())
205 for (
unsigned n = 0;; n = 0)
208 for (
auto i: m_sessions)
209 if (
auto p = i.second.lock())
210 if (p->isConnected())
239 if (m_peers.count(_id))
245 if (
Node n = m_nodeTable->node(_id))
246 p = make_shared<Peer>(n);
255 p->m_lastConnected = std::chrono::system_clock::now();
256 p->endpoint.address = _s->remoteEndpoint().address();
258 auto protocolVersion = _rlp[0].
toInt<
unsigned>();
259 auto clientVersion = _rlp[1].
toString();
261 auto listenPort = _rlp[3].
toInt<
unsigned short>();
266 cdebug <<
"Wrong ID: " << pub <<
" vs. " << _id;
272 stringstream capslog;
275 caps.erase(remove_if(caps.begin(), caps.end(), [&](
CapDesc const& _r){
return !haveCapability(_r) || any_of(caps.begin(), caps.end(), [&](
CapDesc const& _o){
return _r.first == _o.first && _o.second > _r.second && haveCapability(_o); }); }), caps.end());
278 capslog <<
"(" << cap.first <<
"," << dec << cap.second <<
")";
280 clog(
NetMessageSummary) <<
"Hello: " << clientVersion <<
"V[" << protocolVersion <<
"]" << _id << showbase << capslog.str() << dec << listenPort;
283 shared_ptr<SessionFace> ps = make_shared<Session>(
this, move(_io), _s, p,
PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort,
chrono::steady_clock::duration(), _rlp[2].
toSet<
CapDesc>(), 0, map<string, string>(), protocolVersion}));
295 if (m_netPrefs.pin && !m_requiredPeers.count(_id))
297 cdebug <<
"Unexpected identity from peer (got" << _id <<
", must be one of " << m_requiredPeers <<
")";
304 if (m_sessions.count(_id) && !!m_sessions[_id].lock())
305 if (
auto s = m_sessions[_id].lock())
309 clog(
NetWarn) <<
"Session already exists for peer with id" << _id;
314 if (!peerSlotsAvailable())
324 for (
auto const& i: caps)
326 auto pcap = m_capabilities[i];
331 pcap->newPeerCapability(ps, 0, i, cnt++);
334 pcap->newPeerCapability(ps, offset, i, 0);
335 offset += pcap->messageCount();
340 m_sessions[_id] = ps;
350 clog(
NetP2PNote) <<
"p2p.host.nodeTable.events.nodeEntryAdded " << _n;
352 if (
Node n = m_nodeTable->node(_n))
357 if (m_peers.count(_n))
360 p->endpoint = n.endpoint;
364 p = make_shared<Peer>(n);
366 clog(
NetP2PNote) <<
"p2p.host.peers.events.peerAdded " << _n << p->endpoint;
369 if (peerSlotsAvailable(Egress))
375 clog(
NetP2PNote) <<
"p2p.host.nodeTable.events.NodeEntryDropped " << _n;
387 auto laddr = m_netPrefs.listenIPAddress.empty() ?
bi::address() : bi::address::from_string(m_netPrefs.listenIPAddress);
388 auto lset = !laddr.is_unspecified();
389 auto paddr = m_netPrefs.publicIPAddress.empty() ?
bi::address() : bi::address::from_string(m_netPrefs.publicIPAddress);
390 auto pset = !paddr.is_unspecified();
393 bool publicIsHost = !lset && pset && ifAddresses.count(paddr);
396 if (m_netPrefs.traverseNAT && listenIsPublic)
398 clog(
NetNote) <<
"Listen address set to Public address:" << laddr <<
". UPnP disabled.";
401 else if (m_netPrefs.traverseNAT && publicIsHost)
403 clog(
NetNote) <<
"Public address set to Host configured address:" << paddr <<
". UPnP disabled.";
406 else if (m_netPrefs.traverseNAT)
409 ep =
Network::traverseNAT(lset && ifAddresses.count(laddr) ? std::set<bi::address>({laddr}) : ifAddresses, m_listenPort, natIFAddr);
411 if (lset && natIFAddr != laddr)
413 clog(
NetWarn) <<
"Listen address" << laddr <<
"differs from local address" << natIFAddr <<
"returned by UPnP!";
415 if (pset && ep.address() != paddr)
418 clog(
NetWarn) <<
"Specified public address" << paddr <<
"differs from external address" << ep.address() <<
"returned by UPnP!";
432 if (m_run && !m_accepting)
434 clog(
NetConnect) <<
"Listening on local port " << m_listenPort <<
" (public: " << m_tcpPublic <<
")";
437 auto socket = make_shared<RLPXSocket>(m_ioService);
438 m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec)
446 if (peerCount() > peerSlots(Ingress))
448 clog(
NetConnect) <<
"Dropping incoming connect due to maximum peer count (" << Ingress <<
" * ideal peer count): " << socket->remoteEndpoint();
455 bool success =
false;
459 auto handshake = make_shared<RLPXHandshake>(
this, socket);
460 m_connecting.push_back(handshake);
466 clog(
NetWarn) <<
"ERROR: " << diagnostic_information(_e);
468 catch (std::exception
const& _e)
474 socket->ref().close();
484 {
Public(
"a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c"),
"52.16.188.185:30303" },
485 {
Public(
"de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786"),
"54.94.239.50:30303" },
486 {
Public(
"1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082"),
"52.74.57.123:30303" },
488 {
Public(
"e4533109cc9bd7604e4ff6c095f7a1d807e15b38e9bfeb05d3b7c423ba86af0a9e89abbf40bd9dde4250fef114cd09270fa4e224cbeef8b7bf05a51e8260d6b8"),
"94.242.229.4:40404" },
489 {
Public(
"8c336ee6f03e99613ad21274f269479bf4413fb294d697ef15ab897598afb931f56beb8e97af530aee20ce2bcba5776f4a312bc168545de4d43736992c814592"),
"94.242.229.203:30303" },
504 while (!haveNetwork())
506 this_thread::sleep_for(chrono::milliseconds(50));
514 m_nodeTable->addNode(
Node(_node, _endpoint));
519 m_requiredPeers.insert(_n);
530 if (m_peers.count(_n))
538 p = make_shared<Peer>(
node);
543 m_nodeTable->addNode(*p, NodeTable::NodeRelation::Unknown);
545 else if (m_nodeTable)
547 m_nodeTable->addNode(node);
548 auto t = make_shared<boost::asio::deadline_timer>(m_ioService);
549 t->expires_from_now(boost::posix_time::milliseconds(600));
550 t->async_wait([
this, _n](boost::system::error_code
const& _ec)
554 if (
auto n = m_nodeTable->node(_n))
555 requirePeer(n.id, n.endpoint);
558 m_timers.push_back(t);
564 Guard l(x_requiredPeers);
565 if (m_requiredPeers.count(_node))
566 m_requiredPeers.erase(_node);
574 if (havePeerSession(_p->id))
580 if (!!m_nodeTable && !m_nodeTable->haveNode(_p->id) && _p->peerType ==
PeerType::Optional)
584 Peer *nptr = _p.get();
586 Guard l(x_pendingNodeConns);
587 if (m_pendingPeerConns.count(nptr))
589 m_pendingPeerConns.insert(nptr);
592 _p->m_lastAttempted = std::chrono::system_clock::now();
594 bi::tcp::endpoint ep(_p->endpoint);
595 clog(
NetConnect) <<
"Attempting connection to node" << _p->id <<
"@" << ep <<
"from" << id();
596 auto socket = make_shared<RLPXSocket>(m_ioService);
597 socket->ref().async_connect(ep, [=](boost::system::error_code
const& ec)
599 _p->m_lastAttempted = std::chrono::system_clock::now();
600 _p->m_failedAttempts++;
604 clog(
NetConnect) <<
"Connection refused to node" << _p->id <<
"@" << ep <<
"(" << ec.message() <<
")";
611 auto handshake = make_shared<RLPXHandshake>(
this, socket, _p->id);
613 Guard l(x_connecting);
614 m_connecting.push_back(handshake);
620 Guard l(x_pendingNodeConns);
621 m_pendingPeerConns.erase(nptr);
630 std::vector<PeerSessionInfo> ret;
632 for (
auto& i: m_sessions)
633 if (
auto j = i.second.lock())
634 if (j->isConnected())
635 ret.push_back(j->info());
641 unsigned retCount = 0;
643 for (
auto& i: m_sessions)
644 if (std::shared_ptr<SessionFace> j = i.second.lock())
645 if (j->isConnected())
666 m_nodeTable->processEvents();
670 m_connecting.remove_if([](std::weak_ptr<RLPXHandshake>
h){
return h.expired(); });
672 m_timers.remove_if([](std::shared_ptr<boost::asio::deadline_timer> t)
674 return t->expires_from_now().total_milliseconds() < 0;
686 list<shared_ptr<Peer>> toConnect;
687 unsigned reqConn = 0;
690 for (
auto const& p: m_peers)
692 bool haveSession = havePeerSession(p.second->id);
694 if (haveSession && required)
696 else if (!haveSession && p.second->shouldReconnect() && (!m_netPrefs.pin || required))
697 toConnect.push_back(p.second);
701 for (
auto p: toConnect)
707 unsigned pendingCount = 0;
709 pendingCount = m_pendingPeerConns.size();
710 int openSlots = m_idealPeerCount - peerCount() - pendingCount + reqConn;
712 for (
auto p: toConnect)
717 auto runcb = [
this](boost::system::error_code
const&
error) { run(
error); };
718 m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));
719 m_timer->async_wait(runcb);
732 m_timer.reset(
new boost::asio::deadline_timer(m_ioService));
737 for (
auto const&
h: m_capabilities)
738 h.second->onStarting();
749 clog(
NetP2PNote) <<
"p2p.start.notice id:" << id() <<
"TCP Listen port is invalid or unavailable.";
751 auto nodeTable = make_shared<NodeTable>(
754 NodeIPEndpoint(bi::address::from_string(listenAddress()), listenPort(), listenPort()),
758 m_nodeTable = nodeTable;
759 restoreNetwork(&m_restoreNetwork);
763 run(boost::system::error_code());
773 catch (std::exception
const& _e)
786 for (
auto it = m_sessions.begin(); it != m_sessions.end();)
787 if (
auto p = it->second.lock())
793 it = m_sessions.erase(it);
795 m_lastPing = chrono::steady_clock::now();
800 auto now = chrono::steady_clock::now();
805 for (
auto p: m_sessions)
806 if (
auto pp = p.second.lock())
813 std::list<Peer> peers;
816 for (
auto p: m_peers)
818 peers.push_back(*p.second);
824 for (
auto const& p: peers)
827 if (!p.endpoint.address.is_v4())
831 if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && !!p.endpoint && p.id != id() && (p.peerType ==
PeerType::Required || p.endpoint.isAllowed()))
836 << chrono::duration_cast<chrono::seconds>(p.m_lastConnected.time_since_epoch()).
count()
837 << chrono::duration_cast<chrono::seconds>(p.m_lastAttempted.time_since_epoch()).
count()
838 << p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating;
845 auto state = m_nodeTable->snapshot();
847 for (
auto const& entry: state)
879 unsigned fileVersion = r[0].
toInt<
unsigned>();
889 if (i[0].itemCount() != 4 && i[0].size() != 4)
892 if (i.itemCount() == 4 || i.itemCount() == 11)
896 m_nodeTable->addNode(n);
897 else if (i.itemCount() == 11)
902 shared_ptr<Peer> p = make_shared<Peer>(n);
905 p->m_failedAttempts = i[7].toInt<
unsigned>();
907 p->m_score = (int)i[9].toInt<unsigned>();
908 p->m_rating = (int)i[10].toInt<unsigned>();
913 m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
916 else if (i.itemCount() == 3 || i.itemCount() == 10)
918 Node n((
NodeID)i[2],
NodeIPEndpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<uint16_t>(), i[1].toInt<uint16_t>()));
920 m_nodeTable->addNode(n);
921 else if (i.itemCount() == 10)
926 shared_ptr<Peer> p = make_shared<Peer>(n);
929 p->m_failedAttempts = i[6].toInt<
unsigned>();
931 p->m_score = (int)i[8].toInt<unsigned>();
932 p->m_rating = (int)i[9].toInt<unsigned>();
937 m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
void determinePublic()
Determines and sets m_tcpPublic to publicly advertised address.
void restoreNetwork(bytesConstRef _b)
Deserialise the data and populate the set of known peers.
Adapted from code found on http://stackoverflow.com/questions/180947/base64-decode-snippet-in-c Origi...
bool error(const char *fmt, const Args &...args)
std::chrono::milliseconds const c_keepAliveTimeOut
Disconnect timeout after failure to respond to keepAlivePeers ping.
std::set< T > toSet(int _flags=LaissezFaire) const
h256s subs(bytesConstRef _node)
void setData(SessionFace const &_s, std::string const &_sub, bytes const &_data)
void addPeer(NodeSpec const &_s, PeerType _t)
Add a potential peer.
bool isInt() const
Integer value. Must not have a leading zero.
std::pair< std::string, u256 > CapDesc
std::vector< T > toVector(int _flags=LaissezFaire) const
static KeyPair networkAlias(bytesConstRef _b)
Get or create host identifier (KeyPair).
static bool isFramingAllowedForVersion(unsigned _version)
void noteRude(SessionFace const &_s, std::string const &_sub=std::string())
virtual ~Host()
Will block on network process events.
bytes const & out() const
Read the byte stream.
Simple class that represents a "key pair".
void run(boost::system::error_code const &error)
Called by startedWorking. Not thread-safe; to be called only be Worker.
The Host class Capabilities should be registered prior to startNetwork, since m_capabilities is not t...
std::vector< PeerSessionInfo > PeerSessionInfos
bool isPublicAddress(bi::address const &_addressToCheck)
static KeyPair create()
Create a new, randomly generated object.
SecureFixedHash< 32 > Secret
static int tcp4Listen(bi::tcp::acceptor &_acceptor, NetworkPreferences const &_netPrefs)
Try to bind and listen on _listenPort, else attempt net-allocated port.
void disconnectLatePeers()
Disconnect peers which didn't respond to keepAlivePeers ping prior to c_keepAliveTimeOut.
std::hash for asio::adress
assert(len-trim+(2 *lenIndices)<=WIDTH)
const unsigned c_protocolVersion
Peer network protocol version.
virtual void doWork()
Run network. Not thread-safe; to be called only by worker.
bool isRude(SessionFace const &_s, std::string const &_sub=std::string()) const
h512 Public
A public key: 64 bytes.
size_t peerCount() const
Get number of peers connected.
bytes saveNetwork() const
Serialise the set of known peers.
void connect(std::shared_ptr< Peer > const &_p)
#define DEV_GUARDED(MUTEX)
Simple block guard.
#define DEV_RECURSIVE_GUARDED(MUTEX)
_N toHash(int _flags=Strict) const
Base class for all exceptions.
std::lock_guard< std::mutex > Guard
static std::set< bi::address > getInterfaceAddresses()
#define DEV_WRITE_GUARDED(MUTEX)
PeerSessionInfos peerSessionInfo() const
Get peer information.
static bi::tcp::endpoint traverseNAT(std::set< bi::address > const &_ifAddresses, unsigned short _listenPort, bi::address &o_upnpInterfaceAddr)
Return public endpoint of upnp interface. If successful o_upnpifaddr will be a private interface addr...
#define DEV_READ_GUARDED(MUTEX)
std::vector< byte > bytes
void requirePeer(NodeID const &_node, NodeIPEndpoint const &_endpoint)
Create Peer and attempt keeping peer connected.
std::vector< unsigned char > toBytes() const
std::chrono::seconds const c_keepAliveInterval
Interval at which Host::run will call keepAlivePeers to ping peers.
RLPStream & appendList(size_t _items)
Appends a list.
void onNodeTableEvent(NodeID const &_n, NodeTableEventType const &_e)
void addNode(NodeID const &_node, NodeIPEndpoint const &_endpoint)
Add node as a peer candidate. Node is added if discovery ping is successful and table has capacity...
void start()
Start network. .
NodeIPEndpoint nodeIPEndpoint() const
void runAcceptor()
Called only from startedWorking().
virtual void processEvent(NodeID const &_n, NodeTableEventType const &_e)
bytes data(SessionFace const &_s, std::string const &_subs) const
std::string const clientVersion
void relinquishPeer(NodeID const &_node)
Note peer as no longer being required.
clock::time_point time_point
std::string toString(int _flags=LaissezFaire) const
Converts to string.
static std::unordered_map< Public, std::string > pocHosts()
Default hosts for current version of client.
std::lock_guard< std::recursive_mutex > RecursiveGuard
struct evm_uint160be address(struct evm_env *env)
Host(std::string const &_clientVersion, NetworkPreferences const &_n=NetworkPreferences(), bytesConstRef _restoreNetwork=bytesConstRef())
Start server, listening for connections on the given port.
Representation of connectivity state and all other pertinent Peer metadata.
virtual PeerSessionInfo info() const =0
HostNodeTableHandler(Host &_host)
_T toInt(int _flags=Strict) const
Converts to int of type given; if isString(), decodes as big-endian bytestream.
Class for writing to an RLP bytestream.
virtual void startedWorking()
Called by Worker. Not thread-safe; to be called only by worker.
const NodeIPEndpoint UnspecifiedNodeIPEndpoint
RLPStream & appendRaw(bytesConstRef _rlp, size_t _itemCount=1)
Appends raw (pre-serialised) RLP data. Use with caution.
virtual void doneWorking()
Shutdown network. Not thread-safe; to be called only by worker.
#define DEV_TIMED_FUNCTION_ABOVE(MS)
Class for interpreting Recursive Linear-Prefix Data.
void keepAlivePeers()
Ping the peers to update the latency information and disconnect peers which have timed out...
virtual NodeID id() const =0
UniValue stop(const JSONRPCRequest &jsonRequest)
NodeIPEndpoint endpoint
Endpoints by which we expect to reach node.
void startPeerSession(Public const &_id, RLP const &_hello, std::unique_ptr< RLPXFrameCoder > &&_io, std::shared_ptr< RLPXSocket > const &_s)
Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error...