22 #include "Client.h"
23 #include <chrono>
24 #include <memory>
25 #include <thread>
26 #include <boost/filesystem.hpp>
27 #include <libdevcore/Log.h>
28 #include <libp2p/Host.h>
29 #include "Defaults.h"
30 #include "Executive.h"
31 #include "EthereumHost.h"
32 #include "Block.h"
33 #include "TransactionQueue.h"
34 using namespace std;
35 using namespace dev;
36 using namespace dev::eth;
37 using namespace p2p;
39 static_assert(BOOST_VERSION == 106300, "Wrong boost headers version");
41 std::ostream& dev::eth::operator<<(std::ostream& _out, ActivityReport const& _r)
42 {
43  _out << "Since " << toString(_r.since) << " (" << std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - _r.since).count();
44  _out << "): " << _r.ticks << "ticks";
45  return _out;
46 }
48 #if defined(_WIN32)
49 const char* ClientNote::name() { return EthTeal "^" EthBlue " i"; }
50 const char* ClientChat::name() { return EthTeal "^" EthWhite " o"; }
51 const char* ClientTrace::name() { return EthTeal "^" EthGray " O"; }
52 const char* ClientDetail::name() { return EthTeal "^" EthCoal " 0"; }
53 #else
54 const char* ClientNote::name() { return EthTeal "⧫" EthBlue " ℹ"; }
55 const char* ClientChat::name() { return EthTeal "⧫" EthWhite " ◌"; }
56 const char* ClientTrace::name() { return EthTeal "⧫" EthGray " ◎"; }
57 const char* ClientDetail::name() { return EthTeal "⧫" EthCoal " ●"; }
58 #endif
60 Client::Client(
61  ChainParams const& _params,
62  int _networkID,
63  p2p::Host* _host,
64  std::shared_ptr<GasPricer> _gpForAdoption,
65  std::string const& _dbPath,
66  WithExisting _forceAction,
67  TransactionQueue::Limits const& _l
68 ):
69  ClientBase(_l),
70  Worker("eth", 0),
71  m_bc(_params, _dbPath, _forceAction, [](unsigned d, unsigned t){ std::cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r"; }),
72  m_gp(_gpForAdoption ? _gpForAdoption : make_shared<TrivialGasPricer>()),
76 {
77  init(_host, _dbPath, _forceAction, _networkID);
78 }
81 {
82  stopWorking();
83 }
85 void Client::init(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _forceAction, u256 _networkId)
86 {
89  // Cannot be opened until after blockchain is open, since BlockChain may upgrade the database.
90  // TODO: consider returning the upgrade mechanism here. will delaying the opening of the blockchain database
91  // until after the construction.
92  m_stateDB = State::openDB(_dbPath, bc().genesisHash(), _forceAction);
93  // LAZY. TODO: move genesis state construction/commiting to stateDB openning and have this just take the root from the genesis block.
97  m_bq.setChain(bc());
99  m_lastGetWork = std::chrono::system_clock::now() - chrono::seconds(30);
100  m_tqReady = m_tq.onReady([=](){ this->onTransactionQueueReady(); }); // TODO: should read m_tq->onReady(thisThread, syncTransactionQueue);
101  m_tqReplaced = m_tq.onReplaced([=](h256 const&){ m_needStateReset = true; });
102  m_bqReady = m_bq.onReady([=](){ this->onBlockQueueReady(); }); // TODO: should read m_bq->onReady(thisThread, syncBlockQueue);
103  m_bq.setOnBad([=](Exception& ex){ this->onBadBlock(ex); });
104  bc().setOnBad([=](Exception& ex){ this->onBadBlock(ex); });
105  bc().setOnBlockImport([=](BlockHeader const& _info){
106  if (auto h = m_host.lock())
107  h->onBlockImported(_info);
108  });
110  if (_forceAction == WithExisting::Rescue)
111  bc().rescue(m_stateDB);
113  m_gp->update(bc());
115  auto host = _extNet->registerCapability(make_shared<EthereumHost>(bc(), m_stateDB, m_tq, m_bq, _networkId));
116  m_host = host;
118  _extNet->addCapability(host, EthereumHost::staticName(), EthereumHost::c_oldProtocolVersion); //TODO: remove this once v61+ protocol is common
121  if (_dbPath.size())
122  Defaults::setDBPath(_dbPath);
123  doWork(false);
124  startWorking();
125 }
127 ImportResult Client::queueBlock(bytes const& _block, bool _isSafe)
128 {
129  if (m_bq.status().verified + m_bq.status().verifying + m_bq.status().unverified > 10000)
130  this_thread::sleep_for(std::chrono::milliseconds(500));
131  return m_bq.import(&_block, _isSafe);
132 }
134 tuple<ImportRoute, bool, unsigned> Client::syncQueue(unsigned _max)
135 {
136  stopWorking();
137  return bc().sync(m_bq, m_stateDB, _max);
138 }
141 {
142  // BAD BLOCK!!!
143  bytes const* block = boost::get_error_info<errinfo_block>(_ex);
144  if (!block)
145  {
146  cwarn << "ODD: onBadBlock called but exception (" << _ex.what() << ") has no block in it.";
147  cwarn << boost::diagnostic_information(_ex);
148  return;
149  }
151  badBlock(*block, _ex.what());
152 }
155 {
156  while (true)
157  {
158  function<void()> f;
160  if (!m_functionQueue.empty())
161  {
162  f = m_functionQueue.front();
163  m_functionQueue.pop();
164  }
165  if (f)
166  f();
167  else
168  break;
169  }
170 }
173 {
174  if (auto h = m_host.lock())
175  return h->networkId();
176  return 0;
177 }
179 void Client::setNetworkId(u256 const& _n)
180 {
181  if (auto h = m_host.lock())
182  h->setNetworkId(_n);
183 }
185 bool Client::isSyncing() const
186 {
187  if (auto h = m_host.lock())
188  return h->isSyncing();
189  return false;
190 }
193 {
194  if (auto h = m_host.lock())
195  {
196  SyncState state = h->status().state;
197  return (state != SyncState::Idle && state != SyncState::NewBlocks) || h->bq().items().first > 10;
198  }
199  return false;
200 }
203 {
204  // Synchronise the state according to the head of the block chain.
205  // TODO: currently it contains keys for *all* blocks. Make it remove old ones.
206  clog(ClientTrace) << "startedWorking()";
209  m_preSeal.sync(bc());
211  {
216  }
217 }
220 {
221  // Synchronise the state according to the head of the block chain.
222  // TODO: currently it contains keys for *all* blocks. Make it remove old ones.
224  m_preSeal.sync(bc());
226  {
231  }
232 }
235 {
236  reopenChain(bc().chainParams(), _we);
237 }
240 {
241  bool wasSealing = wouldSeal();
242  if (wasSealing)
243  stopSealing();
244  stopWorking();
246  m_tq.clear();
247  m_bq.clear();
250  {
252  WriteGuard l2(x_preSeal);
253  WriteGuard l3(x_working);
255  auto author = m_preSeal.author(); // backup and restore author.
256  m_preSeal = Block(chainParams().accountStartNonce);
257  m_postSeal = Block(chainParams().accountStartNonce);
258  m_working = Block(chainParams().accountStartNonce);
260  m_stateDB = OverlayDB();
261  bc().reopen(_p, _we);
262  m_stateDB = State::openDB(Defaults::dbPath(), bc().genesisHash(), _we);
267  m_working = Block(chainParams().accountStartNonce);
268  }
270  if (auto h = m_host.lock())
271  h->reset();
273  startedWorking();
274  doWork();
276  startWorking();
277  if (wasSealing)
278  startSealing();
279 }
281 void Client::executeInMainThread(function<void ()> const& _function)
282 {
284  m_functionQueue.push(_function);
285  m_signalled.notify_all();
286 }
289 {
291  {
292  if (!m_postSeal.pending().size())
293  return;
294  m_tq.clear();
297  }
299  startSealing();
300  h256Hash changeds;
301  noteChanged(changeds);
302 }
304 template <class S, class T>
305 static S& filtersStreamOut(S& _out, T const& _fs)
306 {
307  _out << "{";
308  unsigned i = 0;
309  for (h256 const& f: _fs)
310  {
311  _out << (i++ ? ", " : "");
312  if (f == PendingChangedFilter)
313  _out << LogTag::Special << "pending";
314  else if (f == ChainChangedFilter)
315  _out << LogTag::Special << "chain";
316  else
317  _out << f;
318  }
319  _out << "}";
320  return _out;
321 }
323 void Client::appendFromNewPending(TransactionReceipt const& _receipt, h256Hash& io_changed, h256 _sha3)
324 {
326  io_changed.insert(PendingChangedFilter);
327  m_specialFilters.at(PendingChangedFilter).push_back(_sha3);
328  for (pair<h256 const, InstalledFilter>& i: m_filters)
329  {
330  // acceptable number.
331  auto m = i.second.filter.matches(_receipt);
332  if (m.size())
333  {
334  // filter catches them
335  for (LogEntry const& l: m)
336  i.second.changes.push_back(LocalisedLogEntry(l));
337  io_changed.insert(i.first);
338  }
339  }
340 }
342 void Client::appendFromBlock(h256 const& _block, BlockPolarity _polarity, h256Hash& io_changed)
343 {
344  // TODO: more precise check on whether the txs match.
345  auto receipts = bc().receipts(_block).receipts;
348  io_changed.insert(ChainChangedFilter);
349  m_specialFilters.at(ChainChangedFilter).push_back(_block);
350  for (pair<h256 const, InstalledFilter>& i: m_filters)
351  {
352  // acceptable number & looks like block may contain a matching log entry.
353  for (size_t j = 0; j < receipts.size(); j++)
354  {
355  auto tr = receipts[j];
356  auto m = i.second.filter.matches(tr);
357  if (m.size())
358  {
359  auto transactionHash = transaction(_block, j).sha3();
360  // filter catches them
361  for (LogEntry const& l: m)
362  i.second.changes.push_back(LocalisedLogEntry(l, _block, (BlockNumber)bc().number(_block), transactionHash, j, 0, _polarity));
363  io_changed.insert(i.first);
364  }
365  }
366  }
367 }
369 ExecutionResult Client::call(Address _dest, bytes const& _data, u256 _gas, u256 _value, u256 _gasPrice, Address const& _from)
370 {
371  ExecutionResult ret;
372  try
373  {
374  Block temp(chainParams().accountStartNonce);
375  clog(ClientDetail) << "Nonce at " << _dest << " pre:" << m_preSeal.transactionsFrom(_dest) << " post:" << m_postSeal.transactionsFrom(_dest);
377  temp = m_postSeal;
378  temp.mutableState().addBalance(_from, _value + _gasPrice * _gas);
379  Executive e(temp);
380  e.setResultRecipient(ret);
381  if (!e.call(_dest, _from, _value, _gasPrice, &_data, _gas))
382  e.go();
383  e.finalize();
384  }
385  catch (...)
386  {
387  cwarn << "Client::call failed: " << boost::current_exception_diagnostic_information();
388  }
389  return ret;
390 }
392 unsigned static const c_syncMin = 1;
393 unsigned static const c_syncMax = 1000;
394 double static const c_targetDuration = 1;
397 {
398 // cdebug << "syncBlockQueue()";
400  ImportRoute ir;
401  unsigned count;
402  Timer t;
403  tie(ir, m_syncBlockQueue, count) = bc().sync(m_bq, m_stateDB, m_syncAmount);
404  double elapsed = t.elapsed();
406  if (count)
407  {
408  clog(ClientNote) << count << "blocks imported in" << unsigned(elapsed * 1000) << "ms (" << (count / elapsed) << "blocks/s) in #" << bc().number();
409  }
411  if (elapsed > c_targetDuration * 1.1 && count > c_syncMin)
412  m_syncAmount = max(c_syncMin, count * 9 / 10);
413  else if (count == m_syncAmount && elapsed < c_targetDuration * 0.9 && m_syncAmount < c_syncMax)
414  m_syncAmount = min(c_syncMax, m_syncAmount * 11 / 10 + 1);
415  if (ir.liveBlocks.empty())
416  return;
417  onChainChanged(ir);
418 }
421 {
422  Timer timer;
424  h256Hash changeds;
425  TransactionReceipts newPendingReceipts;
428  {
429  if (m_working.isSealed())
430  {
431  ctrace << "Skipping txq sync for a sealed block.";
432  return;
433  }
435  tie(newPendingReceipts, m_syncTransactionQueue) = m_working.sync(bc(), m_tq, *m_gp);
436  }
438  if (newPendingReceipts.empty())
439  {
440  auto s = m_tq.status();
441  ctrace << "No transactions to process. " << m_working.pending().size() << " pending, " << s.current << " queued, " << s.future << " future, " << s.unverified << " unverified";
442  return;
443  }
450  for (size_t i = 0; i < newPendingReceipts.size(); i++)
451  appendFromNewPending(newPendingReceipts[i], changeds, m_postSeal.pending()[i].sha3());
453  // Tell farm about new transaction (i.e. restart mining).
456  // Tell watches about the new transactions.
457  noteChanged(changeds);
459  // Tell network about the new transactions.
460  if (auto h = m_host.lock())
461  h->noteNewTransactions();
463  ctrace << "Processed " << newPendingReceipts.size() << " transactions in" << (timer.elapsed() * 1000) << "(" << (bool)m_syncTransactionQueue << ")";
464 }
466 void Client::onDeadBlocks(h256s const& _blocks, h256Hash& io_changed)
467 {
468  // insert transactions that we are declaring the dead part of the chain
469  for (auto const& h: _blocks)
470  {
471  clog(ClientTrace) << "Dead block:" << h;
472  for (auto const& t: bc().transactions(h))
473  {
474  clog(ClientTrace) << "Resubmitting dead-block transaction " << Transaction(t, CheckTransaction::None);
475  ctrace << "Resubmitting dead-block transaction " << Transaction(t, CheckTransaction::None);
477  }
478  }
480  for (auto const& h: _blocks)
481  appendFromBlock(h, BlockPolarity::Dead, io_changed);
482 }
484 void Client::onNewBlocks(h256s const& _blocks, h256Hash& io_changed)
485 {
486  // remove transactions from m_tq nicely rather than relying on out of date nonce later on.
487  for (auto const& h: _blocks)
488  clog(ClientTrace) << "Live block:" << h;
490  if (auto h = m_host.lock())
491  h->noteNewBlocks();
493  for (auto const& h: _blocks)
494  appendFromBlock(h, BlockPolarity::Live, io_changed);
495 }
498 {
501 // ctrace << "resyncStateFromChain()";
503  if (!isMajorSyncing())
504  {
505  bool preChanged = false;
506  Block newPreMine(chainParams().accountStartNonce);
508  newPreMine = m_preSeal;
510  // TODO: use m_postSeal to avoid re-evaluating our own blocks.
511  preChanged = newPreMine.sync(bc());
513  if (preChanged || m_postSeal.author() != m_preSeal.author())
514  {
516  m_preSeal = newPreMine;
518  m_working = newPreMine;
520  if (!m_postSeal.isSealed() || m_postSeal.info().hash() != newPreMine.info().parentHash())
521  for (auto const& t: m_postSeal.pending())
522  {
523  clog(ClientTrace) << "Resubmitting post-seal transaction " << t;
524 // ctrace << "Resubmitting post-seal transaction " << t;
525  auto ir = m_tq.import(t, IfDropped::Retry);
526  if (ir != ImportResult::Success)
528  }
533  }
535  // Quick hack for now - the TQ at this point already has the prior pending transactions in it;
536  // we should resync with it manually until we are stricter about what constitutes "knowing".
538  }
539 }
542 {
543  Block newPreMine(chainParams().accountStartNonce);
545  newPreMine = m_preSeal;
548  m_working = newPreMine;
554 }
557 {
558 // ctrace << "onChainChanged()";
559  h256Hash changeds;
560  onDeadBlocks(_ir.deadBlocks, changeds);
561  for (auto const& t: _ir.goodTranactions)
562  {
563  clog(ClientTrace) << "Safely dropping transaction " << t.sha3();
564  m_tq.dropGood(t);
565  }
566  onNewBlocks(_ir.liveBlocks, changeds);
568  noteChanged(changeds);
569 }
572 {
573  return chrono::system_clock::now() - m_lastGetWork < chrono::seconds(30);
574 }
577 {
578  clog(ClientTrace) << "Post state changed.";
579  m_signalled.notify_all();
580  m_remoteWorking = false;
581 }
584 {
585  if (m_wouldSeal == true)
586  return;
587  clog(ClientNote) << "Mining Beneficiary: " << author();
588  if (author())
589  {
590  m_wouldSeal = true;
591  m_signalled.notify_all();
592  }
593  else
594  clog(ClientNote) << "You need to set an author in order to seal!";
595 }
598 {
599  if ((wouldSeal() || remoteActive()) && !isMajorSyncing())
600  {
601  if (sealEngine()->shouldSeal(this))
602  {
603  m_wouldButShouldnot = false;
605  clog(ClientTrace) << "Rejigging seal engine...";
607  {
608  if (m_working.isSealed())
609  {
610  clog(ClientNote) << "Tried to seal sealed block...";
611  return;
612  }
614  }
616  {
620  }
622  if (wouldSeal())
623  {
624  sealEngine()->onSealGenerated([=](bytes const& header){
625  if (!this->submitSealed(header))
626  clog(ClientNote) << "Submitting block failed...";
627  });
628  ctrace << "Generating seal on" << m_sealingInfo.hash(WithoutSeal) << "#" << m_sealingInfo.number();
630  }
631  }
632  else
633  m_wouldButShouldnot = true;
634  }
635  if (!m_wouldSeal)
637 }
639 void Client::noteChanged(h256Hash const& _filters)
640 {
642  if (_filters.size())
643  filtersStreamOut(cwatch << "noteChanged:", _filters);
644  // accrue all changes left in each filter into the watches.
645  for (auto& w: m_watches)
646  if (_filters.count(w.second.id))
647  {
648  if (m_filters.count(w.second.id))
649  {
650  cwatch << "!!!" << w.first << w.second.id.abridged();
651  w.second.changes += m_filters.at(w.second.id).changes;
652  }
653  else if (m_specialFilters.count(w.second.id))
654  for (h256 const& hash: m_specialFilters.at(w.second.id))
655  {
656  cwatch << "!!!" << w.first << LogTag::Special << (w.second.id == PendingChangedFilter ? "pending" : w.second.id == ChainChangedFilter ? "chain" : "???");
657  w.second.changes.push_back(LocalisedLogEntry(SpecialLogEntry, hash));
658  }
659  }
660  // clear the filters now.
661  for (auto& i: m_filters)
662  i.second.changes.clear();
663  for (auto& i: m_specialFilters)
664  i.second.clear();
665 }
667 void Client::doWork(bool _doWait)
668 {
669  bool t = true;
670  if (m_syncBlockQueue.compare_exchange_strong(t, false))
671  syncBlockQueue();
673  if (m_needStateReset)
674  {
675  resetState();
676  m_needStateReset = false;
677  }
679  t = true;
680  bool isSealed = false;
682  isSealed = m_working.isSealed();
683  if (!isSealed && !isSyncing() && !m_remoteWorking && m_syncTransactionQueue.compare_exchange_strong(t, false))
686  tick();
688  rejigSealing();
693  isSealed = m_working.isSealed();
694  // If the block is sealed, we have to wait for it to tickle through the block queue
695  // (which only signals as wanting to be synced if it is ready).
696  if (!m_syncBlockQueue && !m_syncTransactionQueue && (_doWait || isSealed))
697  {
698  std::unique_lock<std::mutex> l(x_signalled);
699  m_signalled.wait_for(l, chrono::seconds(1));
700  }
701 }
704 {
705  if (chrono::system_clock::now() - m_lastTick > chrono::seconds(1))
706  {
707  m_report.ticks++;
709  m_bq.tick();
710  m_lastTick = chrono::system_clock::now();
711  if (m_report.ticks == 15)
713  }
714 }
717 {
718  if (chrono::system_clock::now() - m_lastGarbageCollection > chrono::seconds(5))
719  {
720  // watches garbage collection
721  vector<unsigned> toUninstall;
723  for (auto key: keysOf(m_watches))
724  if (m_watches[key].lastPoll != chrono::system_clock::time_point::max() && chrono::system_clock::now() - m_watches[key].lastPoll > chrono::seconds(20))
725  {
726  toUninstall.push_back(key);
727  clog(ClientTrace) << "GC: Uninstall" << key << "(" << chrono::duration_cast<chrono::seconds>(chrono::system_clock::now() - m_watches[key].lastPoll).count() << "s old)";
728  }
729  for (auto i: toUninstall)
730  uninstallWatch(i);
732  // blockchain GC
733  bc().garbageCollect();
735  m_lastGarbageCollection = chrono::system_clock::now();
736  }
737 }
740 {
741  startWorking();
742 }
744 Block Client::block(h256 const& _block) const
745 {
746  try
747  {
748  Block ret(bc(), m_stateDB);
749  ret.populateFromChain(bc(), _block);
750  return ret;
751  }
752  catch (Exception& ex)
753  {
754  ex << errinfo_block(bc().block(_block));
755  onBadBlock(ex);
756  return Block(bc());
757  }
758 }
760 Block Client::block(h256 const& _blockHash, PopulationStatistics* o_stats) const
761 {
762  try
763  {
764  Block ret(bc(), m_stateDB);
765  PopulationStatistics s = ret.populateFromChain(bc(), _blockHash);
766  if (o_stats)
767  swap(s, *o_stats);
768  return ret;
769  }
770  catch (Exception& ex)
771  {
772  ex << errinfo_block(bc().block(_blockHash));
773  onBadBlock(ex);
774  return Block(bc());
775  }
776 }
778 State Client::state(unsigned _txi, h256 const& _blockHash) const
779 {
780  try
781  {
782  return block(_blockHash).fromPending(_txi);
783  }
784  catch (Exception& ex)
785  {
786  ex << errinfo_block(bc().block(_blockHash));
787  onBadBlock(ex);
788  return State(chainParams().accountStartNonce);
789  }
790 }
792 eth::State Client::state(unsigned _txi) const
793 {
795  return m_postSeal.fromPending(_txi);
796  assert(false);
797  return State(chainParams().accountStartNonce);
798 }
801 {
802  doWork();
803 }
806 {
807  auto h = m_host.lock();
808  if (!h)
809  return SyncStatus();
810  SyncStatus status = h->status();
811  status.majorSyncing = isMajorSyncing();
812  return status;
813 }
815 bool Client::submitSealed(bytes const& _header)
816 {
817  bytes newBlock;
818  {
820  {
821  UpgradeGuard l2(l);
822  if (!m_working.sealBlock(_header))
823  return false;
824  }
827  newBlock = m_working.blockData();
828  }
830  // OPTIMISE: very inefficient to not utilise the existing OverlayDB in m_postSeal that contains all trie changes.
831  return m_bq.import(&newBlock, true) == ImportResult::Success;
832 }
834 void Client::rewind(unsigned _n)
835 {
836  executeInMainThread([=]() {
837  bc().rewind(_n);
839  });
841  for (unsigned i = 0; i < 10; ++i)
842  {
843  u256 n;
845  n = m_working.info().number();
846  if (n == _n + 1)
847  break;
848  this_thread::sleep_for(std::chrono::milliseconds(50));
849  }
850  auto h = m_host.lock();
851  if (h)
852  h->reset();
853 }
