Fabcoin Core  0.16.2
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef FABCOIN_CHECKQUEUE_H
6 #define FABCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 
10 #include <algorithm>
11 #include <vector>
12 
13 #include <boost/thread/condition_variable.hpp>
14 #include <boost/thread/mutex.hpp>
15 
16 template <typename T>
18 
29 template <typename T>
31 {
32 private:
34  boost::mutex mutex;
35 
37  boost::condition_variable condWorker;
38 
40  boost::condition_variable condMaster;
41 
44  std::vector<T> queue;
45 
47  int nIdle;
48 
50  int nTotal;
51 
53  bool fAllOk;
54 
60  unsigned int nTodo;
61 
63  bool fQuit;
64 
66  unsigned int nBatchSize;
67 
69  bool Loop(bool fMaster = false)
70  {
71  boost::condition_variable& cond = fMaster ? condMaster : condWorker;
72  std::vector<T> vChecks;
73  vChecks.reserve(nBatchSize);
74  unsigned int nNow = 0;
75  bool fOk = true;
76  do {
77  {
78  boost::unique_lock<boost::mutex> lock(mutex);
79  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
80  if (nNow) {
81  fAllOk &= fOk;
82  nTodo -= nNow;
83  if (nTodo == 0 && !fMaster)
84  // We processed the last element; inform the master it can exit and return the result
85  condMaster.notify_one();
86  } else {
87  // first iteration
88  nTotal++;
89  }
90  // logically, the do loop starts here
91  while (queue.empty()) {
92  if ((fMaster || fQuit) && nTodo == 0) {
93  nTotal--;
94  bool fRet = fAllOk;
95  // reset the status for new work later
96  if (fMaster)
97  fAllOk = true;
98  // return the current status
99  return fRet;
100  }
101  nIdle++;
102  cond.wait(lock); // wait
103  nIdle--;
104  }
105  // Decide how many work units to process now.
106  // * Do not try to do everything at once, but aim for increasingly smaller batches so
107  // all workers finish approximately simultaneously.
108  // * Try to account for idle jobs which will instantly start helping.
109  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
110  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
111  vChecks.resize(nNow);
112  for (unsigned int i = 0; i < nNow; i++) {
113  // We want the lock on the mutex to be as short as possible, so swap jobs from the global
114  // queue to the local batch vector instead of copying.
115  vChecks[i].swap(queue.back());
116  queue.pop_back();
117  }
118  // Check whether we need to do work at all
119  fOk = fAllOk;
120  }
121  // execute work
122  for (T& check : vChecks)
123  if (fOk)
124  fOk = check();
125  vChecks.clear();
126  } while (true);
127  }
128 
129 public:
131  boost::mutex ControlMutex;
132 
134  CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
135 
137  void Thread()
138  {
139  Loop();
140  }
141 
143  bool Wait()
144  {
145  return Loop(true);
146  }
147 
149  void Add(std::vector<T>& vChecks)
150  {
151  boost::unique_lock<boost::mutex> lock(mutex);
152  for (T& check : vChecks) {
153  queue.push_back(T());
154  check.swap(queue.back());
155  }
156  nTodo += vChecks.size();
157  if (vChecks.size() == 1)
158  condWorker.notify_one();
159  else if (vChecks.size() > 1)
160  condWorker.notify_all();
161  }
162 
164  {
165  }
166 
167 };
168 
173 template <typename T>
174 class CCheckQueueControl
175 {
176 private:
178  bool fDone;
179 
180 public:
181  CCheckQueueControl() = delete;
182  CCheckQueueControl(const CCheckQueueControl&) = delete;
183  CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
184  explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
185  {
186  // passed queue is supposed to be unused, or nullptr
187  if (pqueue != nullptr) {
189  }
190  }
191 
192  bool Wait()
193  {
194  if (pqueue == nullptr)
195  return true;
196  bool fRet = pqueue->Wait();
197  fDone = true;
198  return fRet;
199  }
200 
201  void Add(std::vector<T>& vChecks)
202  {
203  if (pqueue != nullptr)
204  pqueue->Add(vChecks);
205  }
206 
208  {
209  if (!fDone)
210  Wait();
211  if (pqueue != nullptr) {
213  }
214  }
215 };
216 
217 #endif // FABCOIN_CHECKQUEUE_H
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:201
boost::condition_variable condWorker
Worker threads block on this when out of work.
Definition: checkqueue.h:37
boost::mutex mutex
Mutex to protect the inner state.
Definition: checkqueue.h:34
#define T(i, x)
boost::condition_variable condMaster
Master thread blocks on this when out of work.
Definition: checkqueue.h:40
bool Loop(bool fMaster=false)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:69
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:184
void Thread()
Worker thread.
Definition: checkqueue.h:137
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:17
ExecStats::duration min
Definition: ExecStats.cpp:35
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:134
std::vector< T > queue
The queue of elements to be processed.
Definition: checkqueue.h:44
bool fAllOk
The temporary evaluation result.
Definition: checkqueue.h:53
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:185
int nTotal
The total number of workers (including the master).
Definition: checkqueue.h:50
Queue for verifications that have to be performed.
Definition: checkqueue.h:30
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:177
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:179
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:143
int nIdle
The number of workers (including the master) that are idle.
Definition: checkqueue.h:47
bool fQuit
Whether we&#39;re shutting down.
Definition: checkqueue.h:63
unsigned int nTodo
Number of verifications that haven&#39;t completed yet.
Definition: checkqueue.h:60
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:149
unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:66
dev::WithExisting max(dev::WithExisting _a, dev::WithExisting _b)
Definition: Common.h:326
boost::mutex ControlMutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:131