7 #if !defined(NO_OS_DEPENDENCE) && defined(SOCKETS_AVAILABLE) 11 #define CRYPTOPP_TRACE_NETWORK 0 15 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
17 if (!m_maxBytesPerSecond)
20 const double curTime = GetCurTimeAndCleanUp();
24 for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
25 total += m_ops[i].second;
29 double LimitedBandwidth::TimeToNextTransceive()
31 if (!m_maxBytesPerSecond)
34 if (!m_nextTransceiveTime)
35 ComputeNextTransceiveTime();
40 void LimitedBandwidth::NoteTransceive(
lword size)
42 if (m_maxBytesPerSecond)
44 double curTime = GetCurTimeAndCleanUp();
45 m_ops.push_back(std::make_pair(curTime, size));
46 m_nextTransceiveTime = 0;
50 void LimitedBandwidth::ComputeNextTransceiveTime()
52 double curTime = GetCurTimeAndCleanUp();
54 for (
unsigned int i=0; i!=m_ops.size(); ++i)
55 total += m_ops[i].second;
56 m_nextTransceiveTime =
57 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
60 double LimitedBandwidth::GetCurTimeAndCleanUp()
62 if (!m_maxBytesPerSecond)
65 double curTime = m_timer.ElapsedTimeAsDouble();
66 while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
73 double nextTransceiveTime = TimeToNextTransceive();
74 if (nextTransceiveTime)
75 container.ScheduleEvent(nextTransceiveTime,
CallStack(
"LimitedBandwidth::GetWaitObjects()", &callStack));
80 size_t NonblockingSource::GeneralPump2(
81 lword& byteCount,
bool blockingOutput,
82 unsigned long maxTime,
bool checkDelimiter,
byte delimiter)
84 m_blockedBySpeedLimit =
false;
86 if (!GetMaxBytesPerSecond())
88 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
89 m_doPumpBlocked = (ret != 0);
94 unsigned long timeToGo = maxTime;
96 lword maxSize = byteCount;
103 lword curMaxSize =
UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
105 if (curMaxSize || m_doPumpBlocked)
108 size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
109 m_doPumpBlocked = (ret != 0);
112 NoteTransceive(curMaxSize);
113 byteCount += curMaxSize;
119 if (maxSize != ULONG_MAX && byteCount >= maxSize)
129 double waitTime = TimeToNextTransceive();
130 if (!forever && waitTime > timeToGo)
132 m_blockedBySpeedLimit =
true;
137 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NonblockingSource::GeneralPump2() - speed limit", 0));
138 container.Wait((
unsigned long)waitTime);
144 size_t NonblockingSource::PumpMessages2(
unsigned int &messageCount,
bool blocking)
146 if (messageCount == 0)
157 if (!m_messageEndSent && SourceExhausted())
159 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(),
true));
160 m_messageEndSent =
true;
166 lword NonblockingSink::TimedFlush(
unsigned long maxTime,
size_t targetSize)
168 m_blockedBySpeedLimit =
false;
170 size_t curBufSize = GetCurrentBufferSize();
171 if (curBufSize <= targetSize && (targetSize || !EofPending()))
174 if (!GetMaxBytesPerSecond())
175 return DoFlush(maxTime, targetSize);
178 unsigned long timeToGo = maxTime;
180 lword totalFlushed = 0;
186 size_t flushSize =
UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
187 if (flushSize || EofPending())
190 size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
199 if (curBufSize <= targetSize && (targetSize || !EofPending()))
209 double waitTime = TimeToNextTransceive();
210 if (!forever && waitTime > timeToGo)
212 m_blockedBySpeedLimit =
true;
217 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NonblockingSink::TimedFlush() - speed limit", 0));
218 container.Wait((
unsigned long)waitTime);
224 bool NonblockingSink::IsolatedFlush(
bool hardFlush,
bool blocking)
227 return hardFlush && (!!GetCurrentBufferSize() || EofPending());
233 : NonblockingSource(attachment), m_buf(1024*16)
234 , m_putSize(0), m_dataBegin(0), m_dataEnd(0)
235 , m_waitingForResult(false), m_outputBlocked(false)
239 unsigned int NetworkSource::GetMaxWaitObjectCount()
const 241 return LimitedBandwidth::GetMaxWaitObjectCount()
242 + GetReceiver().GetMaxWaitObjectCount()
243 + AttachedTransformation()->GetMaxWaitObjectCount();
248 if (BlockedBySpeedLimit())
249 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - speed limit", &callStack));
250 else if (!m_outputBlocked)
252 if (m_dataBegin == m_dataEnd)
253 AccessReceiver().GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - no data", &callStack));
255 container.SetNoWait(
CallStack(
"NetworkSource::GetWaitObjects() - have data", &callStack));
258 AttachedTransformation()->GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - attachment", &callStack));
261 size_t NetworkSource::DoPump(
lword &byteCount,
bool blockingOutput,
unsigned long maxTime,
bool checkDelimiter,
byte delimiter)
263 NetworkReceiver &receiver = AccessReceiver();
265 lword maxSize = byteCount;
276 if (m_dataBegin == m_dataEnd)
278 if (receiver.EofReceived())
281 if (m_waitingForResult)
283 if (receiver.MustWaitForResult() &&
285 CallStack(
"NetworkSource::DoPump() - wait receive result", 0)))
288 unsigned int recvResult = receiver.GetReceiveResult();
289 #if CRYPTOPP_TRACE_NETWORK 290 OutputDebugString((
IntToString((
unsigned int)
this) +
": Received " +
IntToString(recvResult) +
" bytes\n").c_str());
292 m_dataEnd += recvResult;
293 m_waitingForResult =
false;
295 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
300 m_dataEnd = m_dataBegin = 0;
302 if (receiver.MustWaitToReceive())
305 CallStack(
"NetworkSource::DoPump() - wait receive", 0)))
308 receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
309 m_waitingForResult =
true;
314 m_waitingForResult =
true;
317 #if CRYPTOPP_TRACE_NETWORK 318 OutputDebugString((
IntToString((
unsigned int)
this) +
": Receiving " +
IntToString(m_buf.size()-m_dataEnd) +
" bytes\n").c_str());
320 while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
322 unsigned int recvResult = receiver.GetReceiveResult();
323 #if CRYPTOPP_TRACE_NETWORK 324 OutputDebugString((
IntToString((
unsigned int)
this) +
": Received " +
IntToString(recvResult) +
" bytes\n").c_str());
326 m_dataEnd += recvResult;
327 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
329 m_waitingForResult =
false;
338 m_putSize =
UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
341 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
344 size_t result = t->
PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
348 CallStack(
"NetworkSource::DoPump() - wait attachment", 0)))
352 m_outputBlocked =
true;
356 m_outputBlocked =
false;
358 byteCount += m_putSize;
359 m_dataBegin += m_putSize;
360 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
362 if (maxSize != ULONG_MAX && byteCount == maxSize)
367 if (maxTime > 0 && timer.ElapsedTime() > maxTime)
377 NetworkSink::NetworkSink(
unsigned int maxBufferSize,
unsigned int autoFlushBound)
378 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
379 , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
380 , m_buffer(
STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
381 , m_speedTimer(
Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
382 , m_currentSpeed(0), m_maxObservedSpeed(0)
386 float NetworkSink::ComputeCurrentSpeed()
388 if (m_speedTimer.ElapsedTime() > 1000)
390 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
391 m_maxObservedSpeed =
STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98
f);
392 m_byteCountSinceLastTimerReset = 0;
393 m_speedTimer.StartTimer();
396 return m_currentSpeed;
399 float NetworkSink::GetMaxObservedSpeed()
const 401 lword m = GetMaxBytesPerSecond();
402 return m ?
STDMIN(m_maxObservedSpeed, static_cast<float>(m)) : m_maxObservedSpeed;
405 unsigned int NetworkSink::GetMaxWaitObjectCount()
const 407 return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
412 if (BlockedBySpeedLimit())
413 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - speed limit", &callStack));
414 else if (m_wasBlocked)
415 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - was blocked", &callStack));
416 else if (!m_buffer.IsEmpty())
417 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
418 else if (EofPending())
419 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - EOF pending", &callStack));
422 size_t NetworkSink::Put2(
const byte *inString,
size_t length,
int messageEnd,
bool blocking)
424 if (m_eofState == EOF_DONE)
426 if (length || messageEnd)
432 if (m_eofState > EOF_NONE)
439 inString += m_skipBytes;
440 length -= m_skipBytes;
443 m_buffer.Put(inString, length);
445 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
448 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
452 if (m_buffer.CurrentSize() > targetSize)
456 m_skipBytes += length;
457 size_t blockedBytes =
UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
458 return STDMAX<size_t>(blockedBytes, 1);
461 m_wasBlocked =
false;
467 m_eofState = EOF_PENDING_SEND;
471 if (m_eofState != EOF_DONE)
478 lword NetworkSink::DoFlush(
unsigned long maxTime,
size_t targetSize)
480 NetworkSender &sender = AccessSender();
484 unsigned int totalFlushSize = 0;
488 if (m_buffer.CurrentSize() <= targetSize)
491 if (m_needSendResult)
493 if (sender.MustWaitForResult() &&
495 CallStack(
"NetworkSink::DoFlush() - wait send result", 0)))
498 unsigned int sendResult = sender.GetSendResult();
499 #if CRYPTOPP_TRACE_NETWORK 500 OutputDebugString((
IntToString((
unsigned int)
this) +
": Sent " +
IntToString(sendResult) +
" bytes\n").c_str());
502 m_buffer.Skip(sendResult);
503 totalFlushSize += sendResult;
504 m_needSendResult =
false;
506 if (!m_buffer.AnyRetrievable())
510 unsigned long timeOut = maxTime ?
SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
511 if (sender.MustWaitToSend() && !sender.Wait(timeOut,
CallStack(
"NetworkSink::DoFlush() - wait send", 0)))
514 size_t contiguousSize = 0;
515 const byte *block = m_buffer.Spy(contiguousSize);
517 #if CRYPTOPP_TRACE_NETWORK 518 OutputDebugString((
IntToString((
unsigned int)
this) +
": Sending " +
IntToString(contiguousSize) +
" bytes\n").c_str());
520 sender.Send(block, contiguousSize);
521 m_needSendResult =
true;
523 if (maxTime > 0 && timeOut == 0)
527 m_byteCountSinceLastTimerReset += totalFlushSize;
528 ComputeCurrentSpeed();
530 if (m_buffer.IsEmpty() && !m_needSendResult)
532 if (m_eofState == EOF_PENDING_SEND)
535 m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
538 while (m_eofState == EOF_PENDING_DELIVERY)
540 unsigned long timeOut = maxTime ?
SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
541 if (!sender.Wait(timeOut,
CallStack(
"NetworkSink::DoFlush() - wait EOF", 0)))
544 if (sender.EofSent())
545 m_eofState = EOF_DONE;
549 return totalFlushSize;
554 #endif // #ifdef SOCKETS_AVAILABLE Base class for all exceptions thrown by the library.
#define NAMESPACE_BEGIN(x)
Some other error occurred not belonging to other categories.
T1 SaturatingSubtract(const T1 &a, const T2 &b)
Performs a saturating subtract clamped at 0.
const T1 UnsignedMin(const T1 &a, const T2 &b)
Safe comparison of values that could be neagtive and incorrectly promoted.
const T & STDMIN(const T &a, const T &b)
Replacement function for std::min.
#define CRYPTOPP_ASSERT(exp)
const unsigned long INFINITE_TIME
Represents infinite time.
bool Wait(unsigned long milliseconds, CallStack const &callStack)
Wait on this object.
uint8_t const size_t const size
#define CRYPTOPP_UNUSED(x)
std::string IntToString(T value, unsigned int base=10)
Converts a value to a string.
const T & STDMAX(const T &a, const T &b)
Replacement function for std::max.
#define RETURN_IF_NONZERO(x)