Fabcoin Core  0.16.2
P2P Digital Currency
WhisperHost.cpp
Go to the documentation of this file.
1 /*
2  This file is part of cpp-ethereum.
3 
4  cpp-ethereum is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 3 of the License, or
7  (at your option) any later version.
8 
9  cpp-ethereum is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
16 */
22 #include "WhisperHost.h"
23 #include <libdevcore/CommonIO.h>
24 #include <libdevcore/Log.h>
25 #include <libp2p/All.h>
26 #include "WhisperDB.h"
27 
28 using namespace std;
29 using namespace dev;
30 using namespace dev::p2p;
31 using namespace dev::shh;
32 
33 WhisperHost::WhisperHost(bool _storeMessagesInDB): Worker("shh"), m_storeMessagesInDB(_storeMessagesInDB)
34 {
36 }
37 
39 {
41 }
42 
44 {
46  if (m_messages.count(_m))
47  {
48  UpgradeGuard ll(l);
49  auto const& m = m_messages.at(_m);
50 // cnote << "streamRLP: " << m.expiry() << m.ttl() << m.topic() << toHex(m.data());
51  m.streamRLP(_s);
52  }
53 }
54 
56 {
57  // this function processes both outgoing messages originated both by local host (_p == null)
58  // and incoming messages from remote peers (_p != null)
59 
60  //cnote << this << ": inject: " << _m.expiry() << _m.ttl() << _m.topic() << toHex(_m.data());
61 
62  if (_m.isExpired())
63  return;
64 
65  auto h = _m.sha3();
66  {
68  if (m_messages.count(h))
69  return;
70  UpgradeGuard ll(l);
71  m_messages[h] = _m;
72  m_expiryQueue.insert(make_pair(_m.expiry(), h));
73  }
74 
75  // rating of incoming message from remote host is assessed according to the following criteria:
76  // 1. installed watch match; 2. bloom filter match; 2. ttl; 3. proof of work
77 
78  int rating = 0;
79 
82  {
83  ++rating;
84  for (auto const& f: m_filters)
85  if (f.second.filter.matches(_m))
86  for (auto& i: m_watches)
87  if (i.second.id == f.first) // match one of the watches
88  {
89  i.second.changes.push_back(h);
90  rating += 2;
91  }
92  }
93 
94  if (_p) // incoming message from remote peer
95  {
96  rating *= 256;
97  unsigned ttlReward = (256 > _m.ttl() ? 256 - _m.ttl() : 0);
98  rating += ttlReward;
99  rating *= 256;
100  rating += _m.workProved();
101  }
102 
103  // TODO p2p: capability-based rating
104  for (auto i: peerSessions())
105  {
106  auto w = capabilityFromSession<WhisperPeer>(*i.first).get();
107  if (w == _p)
108  w->addRating(rating);
109  else
110  w->noteNewMessage(h, _m);
111  }
112 }
113 
115 {
116  InstalledFilter f(_t);
117  h256 h = f.filter.sha3();
118  unsigned ret = 0;
119 
121  {
122  auto it = m_filters.find(h);
123  if (m_filters.end() == it)
124  m_filters.insert(make_pair(h, f));
125  else
126  it->second.refCount++;
127 
129  ret = m_watches.size() ? m_watches.rbegin()->first + 1 : 0;
130  m_watches[ret] = ClientWatch(h);
131  cwatshh << "+++" << ret << h;
132  }
133 
135  return ret;
136 }
137 
139 {
140  cwatshh << "XXX" << _i;
141 
143  {
144  auto it = m_watches.find(_i);
145  if (it == m_watches.end())
146  return;
147 
148  auto id = it->second.id;
149  m_watches.erase(it);
150 
151  auto fit = m_filters.find(id);
152  if (fit != m_filters.end())
153  {
154  m_bloom.removeRaw(fit->second.filter.exportBloom());
155  if (!--fit->second.refCount)
156  m_filters.erase(fit);
157  }
158  }
159 
161 }
162 
164 {
165  h256s ret;
166  auto wit = m_watches.find(_watchId);
167  if (wit == m_watches.end())
168  return ret;
169  TopicFilter f;
170  {
171  Guard l(m_filterLock);
172  auto fit = m_filters.find(wit->second.id);
173  if (fit == m_filters.end())
174  return ret;
175  f = fit->second.filter;
176  }
178  for (auto const& m: m_messages)
179  if (f.matches(m.second))
180  ret.push_back(m.first);
181  return ret;
182 }
183 
184 h256s WhisperHost::checkWatch(unsigned _watchId)
185 {
186  h256s ret;
187  cleanup();
188 
190  try
191  {
192  ret = m_watches.at(_watchId).changes;
193  m_watches.at(_watchId).changes.clear();
194  }
195  catch (...)
196  {
197  }
198 
199  return ret;
200 }
201 
203 {
204  for (auto i: peerSessions())
205  capabilityFromSession<WhisperPeer>(*i.first)->sendMessages();
206  cleanup();
207 }
208 
210 {
211  // remove old messages.
212  // should be called every now and again.
213  uint64_t now = utcTime();
215  for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it))
216  m_messages.erase(it->second);
217 }
218 
220 {
221  for (auto i: peerSessions())
222  capabilityFromSession<WhisperPeer>(*i.first)->noteAdvertiseTopicsOfInterest();
223 }
224 
225 bool WhisperHost::isWatched(Envelope const& _e) const
226 {
228  if (_e.matchesBloomFilter(m_bloom))
229  for (auto const& f: m_filters)
230  if (f.second.filter.matches(_e))
231  for (auto const& i: m_watches)
232  if (i.second.id == f.first)
233  return true;
234  return false;
235 }
236 
238 {
239  if (!m_storeMessagesInDB)
240  return;
241 
242  try
243  {
246  uint64_t now = utcTime();
247  for (auto const& m: m_messages)
248  if (m.second.expiry() > now)
249  if (isWatched(m.second))
250  db.saveSingleMessage(m.first, m.second);
251  }
252  catch(FailedToOpenLevelDB const& ex)
253  {
254  cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to open DB:" << ex.what();
255  }
256  catch(Exception const& ex)
257  {
258  cwarn << "Exception in WhisperHost::saveMessagesToBD():" << ex.what();
259  }
260  catch(...)
261  {
262  cwarn << "Unknown Exception in WhisperHost::saveMessagesToBD()";
263  }
264 }
265 
267 {
268  if (!m_storeMessagesInDB)
269  return;
270 
271  try
272  {
273  map<h256, Envelope> m;
275  db.loadAllMessages(m);
277  m_messages.swap(m);
278  for (auto const& msg: m)
279  m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first));
280  }
281  catch(Exception const& ex)
282  {
283  cwarn << "Exception in WhisperHost::loadMessagesFromBD():" << ex.what();
284  }
285  catch(...)
286  {
287  cwarn << "Unknown Exception in WhisperHost::loadMessagesFromBD()";
288  }
289 }
TopicBloomFilterHash exportBloom() const
Definition: Common.cpp:98
void noteAdvertiseTopicsOfInterest()
std::map< unsigned, ClientWatch > m_watches
Definition: WhisperHost.h:85
Adapted from code found on http://stackoverflow.com/questions/180947/base64-decode-snippet-in-c Origi...
Definition: Arith256.cpp:15
h256 sha3(IncludeNonce _withNonce=WithNonce) const
Definition: Message.h:68
dev::SharedMutex x_messages
Definition: WhisperHost.h:79
virtual void inject(Envelope const &_e, WhisperPeer *_from=nullptr) override
Definition: WhisperHost.cpp:55
void removeRaw(FixedHash< N > const &_h)
Definition: BloomFilter.h:74
uint64_t utcTime()
Get the current time in seconds since the epoch in UTC.
Definition: Common.cpp:64
dev::Mutex m_filterLock
Definition: WhisperHost.h:83
void streamMessage(h256 _m, RLPStream &_s) const
Definition: WhisperHost.cpp:43
#define h(i)
Definition: sha.cpp:736
h256 sha3() const
Definition: Common.cpp:61
const char * what() const noexceptoverride
Definition: Exceptions.h:42
#define g(i)
Definition: sha.cpp:735
h256s Topics
Definition: Common.h:69
boost::upgrade_to_unique_lock< boost::shared_mutex > UpgradeGuard
Definition: Guards.h:46
std::map< h256, InstalledFilter > m_filters
Definition: WhisperHost.h:84
std::hash for asio::adress
Definition: Common.h:323
Definition: Eth.h:45
boost::upgrade_lock< boost::shared_mutex > UpgradableGuard
Definition: Guards.h:45
virtual h256s watchMessages(unsigned _watchId) override
returns IDs of messages, which match specific watch criteria
void loadAllMessages(std::map< h256, Envelope > &o_dst)
Definition: WhisperDB.cpp:85
void cleanup()
remove old messages
#define DEV_GUARDED(MUTEX)
Simple block guard.
Definition: Guards.h:144
unsigned expiry() const
Definition: Message.h:74
Base class for all exceptions.
Definition: Exceptions.h:39
std::lock_guard< std::mutex > Guard
Definition: Guards.h:41
TopicBloomFilter m_bloom
Definition: WhisperHost.h:86
virtual void uninstallWatch(unsigned _watchId) override
void saveSingleMessage(dev::h256 const &_key, Envelope const &_e)
Definition: WhisperDB.cpp:141
#define cwarn
Definition: Log.h:304
boost::shared_lock< boost::shared_mutex > ReadGuard
Definition: Guards.h:44
unsigned workProved() const
Definition: Message.cpp:155
virtual unsigned installWatch(Topics const &_filter) override
bool isExpired() const
Definition: Message.h:80
boost::unique_lock< boost::shared_mutex > WriteGuard
Definition: Guards.h:47
#define f(x)
Definition: gost.cpp:57
#define cwatshh
Definition: Interface.h:84
void addRaw(FixedHash< N > const &_h)
Definition: BloomFilter.h:60
virtual void doWork() override
Called continuously following sleep for m_idleWaitMs.
bool matches(Envelope const &_m) const
Definition: Common.cpp:68
bool isWatched(Envelope const &_e) const
bool m_storeMessagesInDB
needed for tests and other special cases
Definition: WhisperHost.h:88
bool matchesBloomFilter(TopicBloomFilterHash const &f) const
Definition: Message.cpp:184
unsigned ttl() const
Definition: Message.h:75
std::multimap< unsigned, h256 > m_expiryQueue
Definition: WhisperHost.h:81
std::map< h256, Envelope > m_messages
Definition: WhisperHost.h:80
virtual h256s checkWatch(unsigned _watchId) override
std::vector< h256 > h256s
Definition: FixedHash.h:345
Class for writing to an RLP bytestream.
Definition: RLP.h:383