Fabcoin Core  0.16.2
P2P Digital Currency
TransactionQueue.cpp
Go to the documentation of this file.
1 /*
2  This file is part of cpp-ethereum.
3 
4  cpp-ethereum is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 3 of the License, or
7  (at your option) any later version.
8 
9  cpp-ethereum is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
16 */
22 #include "TransactionQueue.h"
23 
24 #include <libdevcore/Log.h>
25 #include <libethcore/Exceptions.h>
26 #include "Transaction.h"
27 using namespace std;
28 using namespace dev;
29 using namespace dev::eth;
30 
31 const char* TransactionQueueChannel::name() { return EthCyan "┉┅▶"; }
32 const char* TransactionQueueTraceChannel::name() { return EthCyan " ┅▶"; }
33 
34 const size_t c_maxVerificationQueueSize = 8192;
35 
36 TransactionQueue::TransactionQueue(unsigned _limit, unsigned _futureLimit):
37  m_current(PriorityCompare { *this }),
38  m_limit(_limit),
39  m_futureLimit(_futureLimit)
40 {
41  unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
42  for (unsigned i = 0; i < verifierThreads; ++i)
43  m_verifiers.emplace_back([=](){
44  setThreadName("txcheck" + toString(i));
45  this->verifierBody();
46  });
47 }
48 
50 {
52  m_aborting = true;
53  m_queueReady.notify_all();
54  for (auto& i: m_verifiers)
55  i.join();
56 }
57 
59 {
60  // Check if we already know this transaction.
61  h256 h = sha3(_transactionRLP);
62 
63  Transaction t;
64  ImportResult ir;
65  {
67 
68  ir = check_WITH_LOCK(h, _ik);
69  if (ir != ImportResult::Success)
70  return ir;
71 
72  try
73  {
74  // Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender.
75  // If it doesn't work, the signature is bad.
76  // The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction).
77  t = Transaction(_transactionRLP, CheckTransaction::Everything);
78  UpgradeGuard ul(l);
79 // cdebug << "Importing" << t;
80  ir = manageImport_WITH_LOCK(h, t);
81  }
82  catch (...)
83  {
85  }
86  }
87  return ir;
88 }
89 
91 {
92  if (m_known.count(_h))
94 
95  if (m_dropped.count(_h) && _ik == IfDropped::Ignore)
97 
98  return ImportResult::Success;
99 }
100 
102 {
103  // Check if we already know this transaction.
104  h256 h = _transaction.sha3(WithSignature);
105 
106  ImportResult ret;
107  {
109  auto ir = check_WITH_LOCK(h, _ik);
110  if (ir != ImportResult::Success)
111  return ir;
112 
113  {
114  _transaction.safeSender(); // Perform EC recovery outside of the write lock
115  UpgradeGuard ul(l);
116  ret = manageImport_WITH_LOCK(h, _transaction);
117  }
118  }
119  return ret;
120 }
121 
122 Transactions TransactionQueue::topTransactions(unsigned _limit, h256Hash const& _avoid) const
123 {
124  ReadGuard l(m_lock);
125  Transactions ret;
126  for (auto t = m_current.begin(); ret.size() < _limit && t != m_current.end(); ++t)
127  if (!_avoid.count(t->transaction.sha3()))
128  ret.push_back(t->transaction);
129  return ret;
130 }
131 
133 {
134  ReadGuard l(m_lock);
135  return m_known;
136 }
137 
139 {
140  try
141  {
142  assert(_h == _transaction.sha3());
143  // Remove any prior transaction with the same nonce but a lower gas price.
144  // Bomb out if there's a prior transaction with higher gas price.
145  auto cs = m_currentByAddressAndNonce.find(_transaction.from());
146  if (cs != m_currentByAddressAndNonce.end())
147  {
148  auto t = cs->second.find(_transaction.nonce());
149  if (t != cs->second.end())
150  {
151  if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
153  else
154  {
155  h256 dropped = (*t->second).transaction.sha3();
156  remove_WITH_LOCK(dropped);
157  m_onReplaced(dropped);
158  }
159  }
160  }
161  auto fs = m_future.find(_transaction.from());
162  if (fs != m_future.end())
163  {
164  auto t = fs->second.find(_transaction.nonce());
165  if (t != fs->second.end())
166  {
167  if (_transaction.gasPrice() < t->second.transaction.gasPrice())
169  else
170  {
171  fs->second.erase(t);
172  --m_futureSize;
173  if (fs->second.empty())
174  m_future.erase(fs);
175  }
176  }
177  }
178  // If valid, append to transactions.
179  insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
180  clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h;
181 
182  while (m_current.size() > m_limit)
183  {
184  clog(TransactionQueueTraceChannel) << "Dropping out of bounds transaction" << _h;
185  remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
186  }
187 
188  m_onReady();
189  }
190  catch (Exception const& _e)
191  {
192  ctxq << "Ignoring invalid transaction: " << diagnostic_information(_e);
194  }
195  catch (std::exception const& _e)
196  {
197  ctxq << "Ignoring invalid transaction: " << _e.what();
199  }
200 
201  return ImportResult::Success;
202 }
203 
205 {
206  ReadGuard l(m_lock);
207  return maxNonce_WITH_LOCK(_a);
208 }
209 
211 {
212  u256 ret = 0;
213  auto cs = m_currentByAddressAndNonce.find(_a);
214  if (cs != m_currentByAddressAndNonce.end() && !cs->second.empty())
215  ret = cs->second.rbegin()->first + 1;
216  auto fs = m_future.find(_a);
217  if (fs != m_future.end() && !fs->second.empty())
218  ret = std::max(ret, fs->second.rbegin()->first + 1);
219  return ret;
220 }
221 
222 void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
223 {
224  if (m_currentByHash.count(_p.first))
225  {
226  cwarn << "Transaction hash" << _p.first << "already in current?!";
227  return;
228  }
229 
230  Transaction const& t = _p.second;
231  // Insert into current
232  auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
233  PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
234  inserted.first->second = handle;
235  m_currentByHash[_p.first] = handle;
236 
237  // Move following transactions from future to current
239  m_known.insert(_p.first);
240 }
241 
243 {
244  auto t = m_currentByHash.find(_txHash);
245  if (t == m_currentByHash.end())
246  return false;
247 
248  Address from = (*t->second).transaction.from();
249  auto it = m_currentByAddressAndNonce.find(from);
250  assert (it != m_currentByAddressAndNonce.end());
251  it->second.erase((*t->second).transaction.nonce());
252  m_current.erase(t->second);
253  m_currentByHash.erase(t);
254  if (it->second.empty())
255  m_currentByAddressAndNonce.erase(it);
256  m_known.erase(_txHash);
257  return true;
258 }
259 
260 unsigned TransactionQueue::waiting(Address const& _a) const
261 {
262  ReadGuard l(m_lock);
263  unsigned ret = 0;
264  auto cs = m_currentByAddressAndNonce.find(_a);
265  if (cs != m_currentByAddressAndNonce.end())
266  ret = cs->second.size();
267  auto fs = m_future.find(_a);
268  if (fs != m_future.end())
269  ret += fs->second.size();
270  return ret;
271 }
272 
273 void TransactionQueue::setFuture(h256 const& _txHash)
274 {
275  WriteGuard l(m_lock);
276  auto it = m_currentByHash.find(_txHash);
277  if (it == m_currentByHash.end())
278  return;
279 
280  VerifiedTransaction const& st = *(it->second);
281 
282  Address from = st.transaction.from();
283  auto& queue = m_currentByAddressAndNonce[from];
284  auto& target = m_future[from];
285  auto cutoff = queue.lower_bound(st.transaction.nonce());
286  for (auto m = cutoff; m != queue.end(); ++m)
287  {
288  VerifiedTransaction& t = const_cast<VerifiedTransaction&>(*(m->second)); // set has only const iterators. Since we are moving out of container that's fine
289  m_currentByHash.erase(t.transaction.sha3());
290  target.emplace(t.transaction.nonce(), move(t));
291  m_current.erase(m->second);
292  ++m_futureSize;
293  }
294  queue.erase(cutoff, queue.end());
295  if (queue.empty())
296  m_currentByAddressAndNonce.erase(from);
297 }
298 
300 {
301  bool newCurrent = false;
302  auto fs = m_future.find(_t.from());
303  if (fs != m_future.end())
304  {
305  u256 nonce = _t.nonce() + 1;
306  auto fb = fs->second.find(nonce);
307  if (fb != fs->second.end())
308  {
309  auto ft = fb;
310  while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
311  {
312  auto inserted = m_currentByAddressAndNonce[_t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
313  PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
314  inserted.first->second = handle;
315  m_currentByHash[(*handle).transaction.sha3()] = handle;
316  --m_futureSize;
317  ++ft;
318  ++nonce;
319  newCurrent = true;
320  }
321  fs->second.erase(fb, ft);
322  if (fs->second.empty())
323  m_future.erase(_t.from());
324  }
325  }
326 
327  while (m_futureSize > m_futureLimit)
328  {
329  // TODO: priority queue for future transactions
330  // For now just drop random chain end
331  --m_futureSize;
332  clog(TransactionQueueTraceChannel) << "Dropping out of bounds future transaction" << m_future.begin()->second.rbegin()->second.transaction.sha3();
333  m_future.begin()->second.erase(--m_future.begin()->second.end());
334  if (m_future.begin()->second.empty())
335  m_future.erase(m_future.begin());
336  }
337 
338  if (newCurrent)
339  m_onReady();
340 }
341 
342 void TransactionQueue::drop(h256 const& _txHash)
343 {
345 
346  if (!m_known.count(_txHash))
347  return;
348 
349  UpgradeGuard ul(l);
350  m_dropped.insert(_txHash);
351  remove_WITH_LOCK(_txHash);
352 }
353 
355 {
356  WriteGuard l(m_lock);
358  if (!m_known.count(_t.sha3()))
359  return;
360  remove_WITH_LOCK(_t.sha3());
361 }
362 
364 {
365  WriteGuard l(m_lock);
366  m_known.clear();
367  m_current.clear();
369  m_currentByHash.clear();
370  m_future.clear();
371  m_futureSize = 0;
372 }
373 
374 void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId)
375 {
376  bool queued = false;
377  {
378  Guard l(x_queue);
379  unsigned itemCount = _data.itemCount();
380  for (unsigned i = 0; i < itemCount; ++i)
381  {
383  {
384  clog(TransactionQueueChannel) << "Transaction verification queue is full. Dropping" << itemCount - i << "transactions";
385  break;
386  }
387  m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
388  queued = true;
389  }
390  }
391  if (queued)
392  m_queueReady.notify_all();
393 }
394 
396 {
397  while (!m_aborting)
398  {
400 
401  {
402  unique_lock<Mutex> l(x_queue);
403  m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
404  if (m_aborting)
405  return;
406  work = move(m_unverified.front());
407  m_unverified.pop_front();
408  }
409 
410  try
411  {
412  Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
413  ImportResult ir = import(t);
414  m_onImport(ir, t.sha3(), work.nodeId);
415  }
416  catch (...)
417  {
418  // should not happen as exceptions are handled in import.
419  cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
420  }
421  }
422 }
SharedMutex m_lock
General lock.
unsigned m_limit
Max number of pending transactions.
Adapted from code found on http://stackoverflow.com/questions/180947/base64-decode-snippet-in-c Origi...
Definition: Arith256.cpp:15
h256Hash m_dropped
Transactions that have previously been dropped.
Address from() const
Synonym for safeSender().
Definition: Transaction.h:128
std::unordered_map< Address, std::map< u256, VerifiedTransaction > > m_future
std::unordered_map< h256, PriorityQueue::iterator > m_currentByHash
Transaction hash to set ref.
std::atomic< bool > m_aborting
Exit condition for verifier.
void enqueue(RLP const &_data, h512 const &_nodeId)
Add transaction to the queue to be verified and imported.
Do include a signature.
Definition: Transaction.h:39
#define EthCyan
Definition: Terminal.h:131
size_t itemCount() const
Definition: RLP.h:118
unsigned m_futureLimit
Max number of future transactions.
#define h(i)
Definition: sha.cpp:736
std::vector< Transaction > Transactions
Nice name for vector of Transaction.
Definition: Transaction.h:121
std::vector< std::thread > m_verifiers
void drop(h256 const &_txHash)
Remove transaction from the queue.
boost::upgrade_to_unique_lock< boost::shared_mutex > UpgradeGuard
Definition: Guards.h:46
std::hash for asio::adress
Definition: Common.h:323
assert(len-trim+(2 *lenIndices)<=WIDTH)
Signal< ImportResult, h256 const &, h512 const & > m_onImport
Called for each import attempt. Arguments are result, transaction id an node id. Be nice and exit fas...
Mutex x_queue
Verification queue mutex.
#define ctxq
Signal< h256 const & > m_onReplaced
Called whan transction is dropped during a call to import() to make room for another transaction...
boost::upgrade_lock< boost::shared_mutex > UpgradableGuard
Definition: Guards.h:45
bytes transaction
RLP encoded transaction data.
ImportResult manageImport_WITH_LOCK(h256 const &_h, Transaction const &_transaction)
u256 maxNonce_WITH_LOCK(Address const &_a) const
ImportResult
Definition: Common.h:97
void makeCurrent_WITH_LOCK(Transaction const &_t)
Transactions topTransactions(unsigned _limit, h256Hash const &_avoid=h256Hash()) const
Get top transactions from the queue.
#define DEV_GUARDED(MUTEX)
Simple block guard.
Definition: Guards.h:144
Base class for all exceptions.
Definition: Exceptions.h:39
Verified and imported transaction.
std::deque< UnverifiedTransaction > m_unverified
Pending verification queue.
h256Hash m_known
Headers of transactions in both sets.
IfDropped
Import transaction policy.
Definition: Common.h:214
std::lock_guard< std::mutex > Guard
Definition: Guards.h:41
unsigned m_futureSize
Current number of future transactions.
const char * name
Definition: rest.cpp:36
void clear()
Clear the queue.
Don&#39;t import transaction that was previously dropped.
h256 sha3(IncludeSignature _sig=WithSignature) const
#define cwarn
Definition: Log.h:304
std::unordered_map< Address, std::map< u256, PriorityQueue::iterator > > m_currentByAddressAndNonce
Transactions grouped by account and nonce.
ImportResult check_WITH_LOCK(h256 const &_h, IfDropped _ik)
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 256, 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void >> u256
Definition: Common.h:125
boost::shared_lock< boost::shared_mutex > ReadGuard
Definition: Guards.h:44
ImportResult import(bytes const &_tx, IfDropped _ik=IfDropped::Ignore)
Verify and add transaction to the queue synchronously.
Encodes a transaction, ready to be exported to or freshly imported from RLP.
Definition: Transaction.h:84
bool remove_WITH_LOCK(h256 const &_txHash)
boost::unique_lock< boost::shared_mutex > WriteGuard
Definition: Guards.h:47
Address const & safeSender() const noexcept
Like sender() but will never throw.
Definition: Transaction.cpp:98
void setFuture(h256 const &_t)
Mark transaction as future.
void dropGood(Transaction const &_t)
Drop a trasnaction from the list if exists and move following future trasnactions to current (if any)...
#define fb(x)
Definition: rijndael.cpp:171
u256 maxNonce(Address const &_a) const
Get max nonce for an account.
bool sha3(bytesConstRef _input, bytesRef o_output)
Calculate SHA3-256 hash of the given input and load it into the given output.
Definition: SHA3.cpp:214
#define clog(X)
Definition: Log.h:295
h512 nodeId
Network Id of the peer transaction comes from.
std::unordered_set< h256 > h256Hash
Definition: FixedHash.h:349
dev::WithExisting max(dev::WithExisting _a, dev::WithExisting _b)
Definition: Common.h:326
void insertCurrent_WITH_LOCK(std::pair< h256, Transaction > const &_p)
Signal m_onReady
Future transactions.
std::condition_variable m_queueReady
Signaled when m_unverified has a new entry.
h256Hash knownTransactions() const
Get a hash set of transactions in the queue.
unsigned waiting(Address const &_a) const
Get number of pending transactions for account.
uint8_t const * data
Definition: sha3.h:19
Class for interpreting Recursive Linear-Prefix Data.
Definition: RLP.h:64
const size_t c_maxVerificationQueueSize