diff --git a/trantor/net/TcpConnection.h b/trantor/net/TcpConnection.h index 41626bcd..a71e7eff 100644 --- a/trantor/net/TcpConnection.h +++ b/trantor/net/TcpConnection.h @@ -359,7 +359,12 @@ class TRANTOR_EXPORT TcpConnection { sslErrorCallback_ = std::move(cb); } - + /** + * @brief Get the data length in the sending buffer. The sending buffer is + * in the user memory space. + * @note This method should be called in the right event loop. + */ + virtual size_t getBufferedDataLength() const = 0; // TODO: These should be internal APIs virtual void connectEstablished() = 0; virtual void connectDestroyed() = 0; diff --git a/trantor/net/inner/StreamBufferNode.cc b/trantor/net/inner/StreamBufferNode.cc index 0376ac9c..508cab4d 100644 --- a/trantor/net/inner/StreamBufferNode.cc +++ b/trantor/net/inner/StreamBufferNode.cc @@ -45,6 +45,8 @@ class StreamBufferNode : public BufferNode { if (isDone_) return 0; + if (msgBuffer_.readableBytes() > 0) + return static_cast(msgBuffer_.readableBytes()); return 1; } ~StreamBufferNode() override @@ -52,6 +54,10 @@ class StreamBufferNode : public BufferNode if (streamCallback_) streamCallback_(nullptr, 0); // cleanup callback internals } + bool available() const override + { + return !isDone_; + } private: std::function streamCallback_; diff --git a/trantor/net/inner/TcpConnectionImpl.cc b/trantor/net/inner/TcpConnectionImpl.cc index 6a75707b..34fa98b9 100644 --- a/trantor/net/inner/TcpConnectionImpl.cc +++ b/trantor/net/inner/TcpConnectionImpl.cc @@ -83,7 +83,12 @@ TcpConnectionImpl::TcpConnectionImpl(EventLoop *loop, ioChannelPtr_->setErrorCallback([this]() { handleError(); }); socketPtr_->setKeepAlive(true); name_ = localAddr.toIpPort() + "--" + peerAddr.toIpPort(); - +#ifdef _WIN32 + int size = sizeof(sendBufSize_); + ::getsockopt( + socketPtr_->fd(), SOL_SOCKET, SO_SNDBUF, (char *)&sendBufSize_, &size); + LOG_TRACE << "System sending buffer size: " << sendBufSize_ << " bytes"; +#endif if (policy != nullptr) { tlsProviderPtr_ = @@ -317,6 +322,18 @@ void TcpConnectionImpl::setTcpNoDelay(bool on) { socketPtr_->setTcpNoDelay(on); } +void TcpConnectionImpl::checkBufferedDataSize() +{ + loop_->assertInLoopThread(); + if (highWaterMarkCallback_) + { + auto bufferedDataSize = getBufferedDataLength(); + if (bufferedDataSize > highWaterMarkLen_) + { + highWaterMarkCallback_(shared_from_this(), bufferedDataSize); + } + } +} void TcpConnectionImpl::connectDestroyed() { loop_->assertInLoopThread(); @@ -412,21 +429,7 @@ void TcpConnectionImpl::sendInLoop(const char *buffer, size_t length) writeBufferList_.back()->append(static_cast(buffer) + sendLen, length); - if (highWaterMarkCallback_ && - writeBufferList_.back()->remainingBytes() > - static_cast(highWaterMarkLen_)) - { - highWaterMarkCallback_(shared_from_this(), - writeBufferList_.back()->remainingBytes()); - } - if (highWaterMarkCallback_ && tlsProviderPtr_ && - tlsProviderPtr_->getBufferedData().readableBytes() > - highWaterMarkLen_) - { - highWaterMarkCallback_( - shared_from_this(), - tlsProviderPtr_->getBufferedData().readableBytes()); - } + checkBufferedDataSize(); } } // The order of data sending should be same as the order of calls of send() @@ -595,12 +598,16 @@ void TcpConnectionImpl::sendFile(BufferNodePtr &&fileNode) { auto n = sendNodeInLoop(fileNode); if (fileNode->remainingBytes() > 0 && n >= 0) + { writeBufferList_.push_back(std::move(fileNode)); + checkBufferedDataSize(); + } return; } else { writeBufferList_.push_back(std::move(fileNode)); + checkBufferedDataSize(); } } else @@ -611,11 +618,15 @@ void TcpConnectionImpl::sendFile(BufferNodePtr &&fileNode) { auto n = thisPtr->sendNodeInLoop(node); if (node->remainingBytes() > 0 && n >= 0) + { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); + } } else { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); } }); } @@ -631,7 +642,10 @@ void TcpConnectionImpl::sendStream( { auto n = sendNodeInLoop(node); if (node->remainingBytes() > 0 && n >= 0) + { writeBufferList_.push_back(std::move(node)); + checkBufferedDataSize(); + } return; } } @@ -644,11 +658,15 @@ void TcpConnectionImpl::sendStream( { auto n = thisPtr->sendNodeInLoop(node); if (node->remainingBytes() > 0 && n >= 0) + { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); + } } else { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); } }); } @@ -728,6 +746,31 @@ ssize_t TcpConnectionImpl::sendNodeInLoop(const BufferNodePtr &nodePtr) #ifndef _WIN32 ssize_t TcpConnectionImpl::writeRaw(const void *buffer, size_t length) #else +static int sendDataWin(int fd, const char *buffer, int length, int sendBufSize) +{ + int n = 0; + while (n < length) + { + auto toSend = length - n > sendBufSize ? sendBufSize : length - n; + int nWritten = ::send(fd, buffer + n, toSend, 0); + if (nWritten > 0) + { + n += nWritten; + if (nWritten < toSend) + break; + } + else if (nWritten == 0) + { + break; + } + else + { + errno = ::WSAGetLastError(); + break; + } + } + return n; +} ssize_t TcpConnectionImpl::writeRaw(const char *buffer, size_t length) #endif { @@ -735,9 +778,10 @@ ssize_t TcpConnectionImpl::writeRaw(const char *buffer, size_t length) #ifndef _WIN32 int nWritten = write(socketPtr_->fd(), buffer, length); #else - int nWritten = - ::send(socketPtr_->fd(), buffer, static_cast(length), 0); - errno = (nWritten < 0) ? ::WSAGetLastError() : 0; + int nWritten = sendDataWin(socketPtr_->fd(), + buffer, + static_cast(length), + sendBufSize_); #endif if (nWritten > 0) bytesSent_ += nWritten; @@ -930,8 +974,8 @@ AsyncStreamPtr TcpConnectionImpl::sendAsyncStream(bool disableKickoff) idleTimeoutBackup_ = idleTimeout_; idleTimeout_ = 0; } - writeBufferList_.push_back(asyncStreamNode); + checkBufferedDataSize(); } else { @@ -955,11 +999,15 @@ AsyncStreamPtr TcpConnectionImpl::sendAsyncStream(bool disableKickoff) { auto n = thisPtr->sendNodeInLoop(node); if (n >= 0 && (node->remainingBytes() > 0 || node->available())) + { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); + } } else { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); } }); } diff --git a/trantor/net/inner/TcpConnectionImpl.h b/trantor/net/inner/TcpConnectionImpl.h index f19729a0..3a762dc2 100644 --- a/trantor/net/inner/TcpConnectionImpl.h +++ b/trantor/net/inner/TcpConnectionImpl.h @@ -202,10 +202,23 @@ class TcpConnectionImpl : public TcpConnection, idleTimeout_ = timeout; timingWheel->insertEntry(timeout, entry); } + size_t getBufferedDataLength() const override + { + loop_->assertInLoopThread(); + size_t len = 0; + if (tlsProviderPtr_) + { + len += tlsProviderPtr_->getBufferedData().readableBytes(); + } + for (auto &node : writeBufferList_) + { + len += node->remainingBytes(); + } + return len; + } private: /// Internal use only. - std::weak_ptr kickoffEntry_; std::weak_ptr timingWheelWeakPtr_; size_t idleTimeout_{0}; @@ -213,6 +226,7 @@ class TcpConnectionImpl : public TcpConnection, Date lastTimingWheelUpdateTime_; void extendLife(); void sendFile(BufferNodePtr &&fileNode); + void checkBufferedDataSize(); protected: enum class ConnStatus @@ -249,6 +263,7 @@ class TcpConnectionImpl : public TcpConnection, ssize_t writeRaw(const char *buffer, size_t length); // -1: error, 0: EAGAIN, >0: bytes sent ssize_t writeInLoop(const char *buffer, size_t length); + int sendBufSize_; #endif size_t highWaterMarkLen_{0}; std::string name_;