Fabcoin Core  0.16.2
P2P Digital Currency
network.cpp
Go to the documentation of this file.
1 // network.cpp - written and placed in the public domain by Wei Dai
2 
3 #include "pch.h"
4 
5 #include "network.h"
6 
7 #if !defined(NO_OS_DEPENDENCE) && defined(SOCKETS_AVAILABLE)
8 
9 #include "wait.h"
10 
11 #define CRYPTOPP_TRACE_NETWORK 0
12 
14 
15 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
16 {
17  if (!m_maxBytesPerSecond)
18  return ULONG_MAX;
19 
20  const double curTime = GetCurTimeAndCleanUp();
21  CRYPTOPP_UNUSED(curTime);
22 
23  lword total = 0;
24  for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
25  total += m_ops[i].second;
26  return SaturatingSubtract(m_maxBytesPerSecond, total);
27 }
28 
29 double LimitedBandwidth::TimeToNextTransceive()
30 {
31  if (!m_maxBytesPerSecond)
32  return 0;
33 
34  if (!m_nextTransceiveTime)
35  ComputeNextTransceiveTime();
36 
37  return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
38 }
39 
40 void LimitedBandwidth::NoteTransceive(lword size)
41 {
42  if (m_maxBytesPerSecond)
43  {
44  double curTime = GetCurTimeAndCleanUp();
45  m_ops.push_back(std::make_pair(curTime, size));
46  m_nextTransceiveTime = 0;
47  }
48 }
49 
50 void LimitedBandwidth::ComputeNextTransceiveTime()
51 {
52  double curTime = GetCurTimeAndCleanUp();
53  lword total = 0;
54  for (unsigned int i=0; i!=m_ops.size(); ++i)
55  total += m_ops[i].second;
56  m_nextTransceiveTime =
57  (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
58 }
59 
60 double LimitedBandwidth::GetCurTimeAndCleanUp()
61 {
62  if (!m_maxBytesPerSecond)
63  return 0;
64 
65  double curTime = m_timer.ElapsedTimeAsDouble();
66  while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
67  m_ops.pop_front();
68  return curTime;
69 }
70 
71 void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
72 {
73  double nextTransceiveTime = TimeToNextTransceive();
74  if (nextTransceiveTime)
75  container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
76 }
77 
78 // *************************************************************
79 
80 size_t NonblockingSource::GeneralPump2(
81  lword& byteCount, bool blockingOutput,
82  unsigned long maxTime, bool checkDelimiter, byte delimiter)
83 {
84  m_blockedBySpeedLimit = false;
85 
86  if (!GetMaxBytesPerSecond())
87  {
88  size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
89  m_doPumpBlocked = (ret != 0);
90  return ret;
91  }
92 
93  bool forever = (maxTime == INFINITE_TIME);
94  unsigned long timeToGo = maxTime;
95  Timer timer(Timer::MILLISECONDS, forever);
96  lword maxSize = byteCount;
97  byteCount = 0;
98 
99  timer.StartTimer();
100 
101  while (true)
102  {
103  lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
104 
105  if (curMaxSize || m_doPumpBlocked)
106  {
107  if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
108  size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
109  m_doPumpBlocked = (ret != 0);
110  if (curMaxSize)
111  {
112  NoteTransceive(curMaxSize);
113  byteCount += curMaxSize;
114  }
115  if (ret)
116  return ret;
117  }
118 
119  if (maxSize != ULONG_MAX && byteCount >= maxSize)
120  break;
121 
122  if (!forever)
123  {
124  timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
125  if (!timeToGo)
126  break;
127  }
128 
129  double waitTime = TimeToNextTransceive();
130  if (!forever && waitTime > timeToGo)
131  {
132  m_blockedBySpeedLimit = true;
133  break;
134  }
135 
136  WaitObjectContainer container;
137  LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
138  container.Wait((unsigned long)waitTime);
139  }
140 
141  return 0;
142 }
143 
144 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
145 {
146  if (messageCount == 0)
147  return 0;
148 
149  messageCount = 0;
150 
151  lword byteCount;
152  do {
153  byteCount = LWORD_MAX;
154  RETURN_IF_NONZERO(Pump2(byteCount, blocking));
155  } while(byteCount == LWORD_MAX);
156 
157  if (!m_messageEndSent && SourceExhausted())
158  {
159  RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
160  m_messageEndSent = true;
161  messageCount = 1;
162  }
163  return 0;
164 }
165 
166 lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
167 {
168  m_blockedBySpeedLimit = false;
169 
170  size_t curBufSize = GetCurrentBufferSize();
171  if (curBufSize <= targetSize && (targetSize || !EofPending()))
172  return 0;
173 
174  if (!GetMaxBytesPerSecond())
175  return DoFlush(maxTime, targetSize);
176 
177  bool forever = (maxTime == INFINITE_TIME);
178  unsigned long timeToGo = maxTime;
179  Timer timer(Timer::MILLISECONDS, forever);
180  lword totalFlushed = 0;
181 
182  timer.StartTimer();
183 
184  while (true)
185  {
186  size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
187  if (flushSize || EofPending())
188  {
189  if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
190  size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
191  if (ret)
192  {
193  NoteTransceive(ret);
194  curBufSize -= ret;
195  totalFlushed += ret;
196  }
197  }
198 
199  if (curBufSize <= targetSize && (targetSize || !EofPending()))
200  break;
201 
202  if (!forever)
203  {
204  timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
205  if (!timeToGo)
206  break;
207  }
208 
209  double waitTime = TimeToNextTransceive();
210  if (!forever && waitTime > timeToGo)
211  {
212  m_blockedBySpeedLimit = true;
213  break;
214  }
215 
216  WaitObjectContainer container;
217  LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
218  container.Wait((unsigned long)waitTime);
219  }
220 
221  return totalFlushed;
222 }
223 
224 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
225 {
226  TimedFlush(blocking ? INFINITE_TIME : 0);
227  return hardFlush && (!!GetCurrentBufferSize() || EofPending());
228 }
229 
230 // *************************************************************
231 
232 NetworkSource::NetworkSource(BufferedTransformation *attachment)
233  : NonblockingSource(attachment), m_buf(1024*16)
234  , m_putSize(0), m_dataBegin(0), m_dataEnd(0)
235  , m_waitingForResult(false), m_outputBlocked(false)
236 {
237 }
238 
239 unsigned int NetworkSource::GetMaxWaitObjectCount() const
240 {
241  return LimitedBandwidth::GetMaxWaitObjectCount()
242  + GetReceiver().GetMaxWaitObjectCount()
243  + AttachedTransformation()->GetMaxWaitObjectCount();
244 }
245 
246 void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
247 {
248  if (BlockedBySpeedLimit())
249  LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
250  else if (!m_outputBlocked)
251  {
252  if (m_dataBegin == m_dataEnd)
253  AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
254  else
255  container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
256  }
257 
258  AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
259 }
260 
261 size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
262 {
263  NetworkReceiver &receiver = AccessReceiver();
264 
265  lword maxSize = byteCount;
266  byteCount = 0;
267  bool forever = maxTime == INFINITE_TIME;
268  Timer timer(Timer::MILLISECONDS, forever);
269  BufferedTransformation *t = AttachedTransformation();
270 
271  if (m_outputBlocked)
272  goto DoOutput;
273 
274  while (true)
275  {
276  if (m_dataBegin == m_dataEnd)
277  {
278  if (receiver.EofReceived())
279  break;
280 
281  if (m_waitingForResult)
282  {
283  if (receiver.MustWaitForResult() &&
284  !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
285  CallStack("NetworkSource::DoPump() - wait receive result", 0)))
286  break;
287 
288  unsigned int recvResult = receiver.GetReceiveResult();
289 #if CRYPTOPP_TRACE_NETWORK
290  OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
291 #endif
292  m_dataEnd += recvResult;
293  m_waitingForResult = false;
294 
295  if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
296  goto ReceiveNoWait;
297  }
298  else
299  {
300  m_dataEnd = m_dataBegin = 0;
301 
302  if (receiver.MustWaitToReceive())
303  {
304  if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
305  CallStack("NetworkSource::DoPump() - wait receive", 0)))
306  break;
307 
308  receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
309  m_waitingForResult = true;
310  }
311  else
312  {
313 ReceiveNoWait:
314  m_waitingForResult = true;
315  // call Receive repeatedly as long as data is immediately available,
316  // because some receivers tend to return data in small pieces
317 #if CRYPTOPP_TRACE_NETWORK
318  OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
319 #endif
320  while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
321  {
322  unsigned int recvResult = receiver.GetReceiveResult();
323 #if CRYPTOPP_TRACE_NETWORK
324  OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
325 #endif
326  m_dataEnd += recvResult;
327  if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
328  {
329  m_waitingForResult = false;
330  break;
331  }
332  }
333  }
334  }
335  }
336  else
337  {
338  m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
339 
340  if (checkDelimiter)
341  m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
342 
343 DoOutput:
344  size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
345  if (result)
346  {
347  if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
348  CallStack("NetworkSource::DoPump() - wait attachment", 0)))
349  goto DoOutput;
350  else
351  {
352  m_outputBlocked = true;
353  return result;
354  }
355  }
356  m_outputBlocked = false;
357 
358  byteCount += m_putSize;
359  m_dataBegin += m_putSize;
360  if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
361  break;
362  if (maxSize != ULONG_MAX && byteCount == maxSize)
363  break;
364  // once time limit is reached, return even if there is more data waiting
365  // but make 0 a special case so caller can request a large amount of data to be
366  // pumped as long as it is immediately available
367  if (maxTime > 0 && timer.ElapsedTime() > maxTime)
368  break;
369  }
370  }
371 
372  return 0;
373 }
374 
375 // *************************************************************
376 
377 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
378  : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
379  , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
380  , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
381  , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
382  , m_currentSpeed(0), m_maxObservedSpeed(0)
383 {
384 }
385 
386 float NetworkSink::ComputeCurrentSpeed()
387 {
388  if (m_speedTimer.ElapsedTime() > 1000)
389  {
390  m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
391  m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
392  m_byteCountSinceLastTimerReset = 0;
393  m_speedTimer.StartTimer();
394 // OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
395  }
396  return m_currentSpeed;
397 }
398 
399 float NetworkSink::GetMaxObservedSpeed() const
400 {
401  lword m = GetMaxBytesPerSecond();
402  return m ? STDMIN(m_maxObservedSpeed, static_cast<float>(m)) : m_maxObservedSpeed;
403 }
404 
405 unsigned int NetworkSink::GetMaxWaitObjectCount() const
406 {
407  return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
408 }
409 
410 void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
411 {
412  if (BlockedBySpeedLimit())
413  LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
414  else if (m_wasBlocked)
415  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
416  else if (!m_buffer.IsEmpty())
417  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
418  else if (EofPending())
419  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
420 }
421 
422 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
423 {
424  if (m_eofState == EOF_DONE)
425  {
426  if (length || messageEnd)
427  throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
428 
429  return 0;
430  }
431 
432  if (m_eofState > EOF_NONE)
433  goto EofSite;
434 
435  {
436  if (m_skipBytes)
437  {
438  CRYPTOPP_ASSERT(length >= m_skipBytes);
439  inString += m_skipBytes;
440  length -= m_skipBytes;
441  }
442 
443  m_buffer.Put(inString, length);
444 
445  if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
446  TimedFlush(0, 0);
447 
448  size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
449  if (blocking)
450  TimedFlush(INFINITE_TIME, targetSize);
451 
452  if (m_buffer.CurrentSize() > targetSize)
453  {
454  CRYPTOPP_ASSERT(!blocking);
455  m_wasBlocked = true;
456  m_skipBytes += length;
457  size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
458  return STDMAX<size_t>(blockedBytes, 1);
459  }
460 
461  m_wasBlocked = false;
462  m_skipBytes = 0;
463  }
464 
465  if (messageEnd)
466  {
467  m_eofState = EOF_PENDING_SEND;
468 
469  EofSite:
470  TimedFlush(blocking ? INFINITE_TIME : 0, 0);
471  if (m_eofState != EOF_DONE)
472  return 1;
473  }
474 
475  return 0;
476 }
477 
478 lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
479 {
480  NetworkSender &sender = AccessSender();
481 
482  bool forever = maxTime == INFINITE_TIME;
483  Timer timer(Timer::MILLISECONDS, forever);
484  unsigned int totalFlushSize = 0;
485 
486  while (true)
487  {
488  if (m_buffer.CurrentSize() <= targetSize)
489  break;
490 
491  if (m_needSendResult)
492  {
493  if (sender.MustWaitForResult() &&
494  !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
495  CallStack("NetworkSink::DoFlush() - wait send result", 0)))
496  break;
497 
498  unsigned int sendResult = sender.GetSendResult();
499 #if CRYPTOPP_TRACE_NETWORK
500  OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
501 #endif
502  m_buffer.Skip(sendResult);
503  totalFlushSize += sendResult;
504  m_needSendResult = false;
505 
506  if (!m_buffer.AnyRetrievable())
507  break;
508  }
509 
510  unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
511  if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
512  break;
513 
514  size_t contiguousSize = 0;
515  const byte *block = m_buffer.Spy(contiguousSize);
516 
517 #if CRYPTOPP_TRACE_NETWORK
518  OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
519 #endif
520  sender.Send(block, contiguousSize);
521  m_needSendResult = true;
522 
523  if (maxTime > 0 && timeOut == 0)
524  break; // once time limit is reached, return even if there is more data waiting
525  }
526 
527  m_byteCountSinceLastTimerReset += totalFlushSize;
528  ComputeCurrentSpeed();
529 
530  if (m_buffer.IsEmpty() && !m_needSendResult)
531  {
532  if (m_eofState == EOF_PENDING_SEND)
533  {
534  sender.SendEof();
535  m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
536  }
537 
538  while (m_eofState == EOF_PENDING_DELIVERY)
539  {
540  unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
541  if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
542  break;
543 
544  if (sender.EofSent())
545  m_eofState = EOF_DONE;
546  }
547  }
548 
549  return totalFlushSize;
550 }
551 
553 
554 #endif // #ifdef SOCKETS_AVAILABLE
Base class for all exceptions thrown by the library.
Definition: cryptlib.h:140
uint8_t byte
Definition: Common.h:57
const lword LWORD_MAX
Definition: config.h:246
high resolution timer
Definition: hrtimer.h:57
#define NAMESPACE_BEGIN(x)
Definition: config.h:200
Some other error occurred not belonging to other categories.
Definition: cryptlib.h:159
Interface for buffered transformations.
Definition: cryptlib.h:1352
T1 SaturatingSubtract(const T1 &a, const T2 &b)
Performs a saturating subtract clamped at 0.
Definition: misc.h:847
const T1 UnsignedMin(const T1 &a, const T2 &b)
Safe comparison of values that could be neagtive and incorrectly promoted.
Definition: misc.h:512
const T & STDMIN(const T &a, const T &b)
Replacement function for std::min.
Definition: misc.h:477
#define CRYPTOPP_ASSERT(exp)
Definition: trap.h:92
const unsigned long INFINITE_TIME
Represents infinite time.
Definition: cryptlib.h:111
#define f(x)
Definition: gost.cpp:57
bool Wait(unsigned long milliseconds, CallStack const &callStack)
Wait on this object.
uint8_t const size_t const size
Definition: sha3.h:20
#define CRYPTOPP_UNUSED(x)
Definition: config.h:741
std::string IntToString(T value, unsigned int base=10)
Converts a value to a string.
Definition: misc.h:539
const T & STDMAX(const T &a, const T &b)
Replacement function for std::max.
Definition: misc.h:487
#define NAMESPACE_END
Definition: config.h:201
word64 lword
Definition: config.h:245
virtual size_t PutModifiable2(byte *inString, size_t length, int messageEnd, bool blocking)
Input multiple bytes that may be modified by callee.
Definition: cryptlib.h:1464
#define RETURN_IF_NONZERO(x)
Definition: misc.h:607