From 31c0988e9f51d9320f51830a24201c39eeb99fe8 Mon Sep 17 00:00:00 2001 From: an-tao Date: Wed, 9 Oct 2024 15:56:50 +0800 Subject: [PATCH 1/3] Optimize memory usage when sending data on Windows --- trantor/net/inner/StreamBufferNode.cc | 6 +++++ trantor/net/inner/TcpConnectionImpl.cc | 37 +++++++++++++++++++++++--- trantor/net/inner/TcpConnectionImpl.h | 1 + 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/trantor/net/inner/StreamBufferNode.cc b/trantor/net/inner/StreamBufferNode.cc index 0376ac9c..508cab4d 100644 --- a/trantor/net/inner/StreamBufferNode.cc +++ b/trantor/net/inner/StreamBufferNode.cc @@ -45,6 +45,8 @@ class StreamBufferNode : public BufferNode { if (isDone_) return 0; + if (msgBuffer_.readableBytes() > 0) + return static_cast(msgBuffer_.readableBytes()); return 1; } ~StreamBufferNode() override @@ -52,6 +54,10 @@ class StreamBufferNode : public BufferNode if (streamCallback_) streamCallback_(nullptr, 0); // cleanup callback internals } + bool available() const override + { + return !isDone_; + } private: std::function streamCallback_; diff --git a/trantor/net/inner/TcpConnectionImpl.cc b/trantor/net/inner/TcpConnectionImpl.cc index 6a75707b..f407491e 100644 --- a/trantor/net/inner/TcpConnectionImpl.cc +++ b/trantor/net/inner/TcpConnectionImpl.cc @@ -83,7 +83,10 @@ TcpConnectionImpl::TcpConnectionImpl(EventLoop *loop, ioChannelPtr_->setErrorCallback([this]() { handleError(); }); socketPtr_->setKeepAlive(true); name_ = localAddr.toIpPort() + "--" + peerAddr.toIpPort(); - + int size = sizeof(sendBufSize_); + ::getsockopt( + socketPtr_->fd(), SOL_SOCKET, SO_SNDBUF, (char *)&sendBufSize_, &size); + LOG_TRACE << "Send buffer size: " << sendBufSize_ << " bytes"; if (policy != nullptr) { tlsProviderPtr_ = @@ -728,6 +731,31 @@ ssize_t TcpConnectionImpl::sendNodeInLoop(const BufferNodePtr &nodePtr) #ifndef _WIN32 ssize_t TcpConnectionImpl::writeRaw(const void *buffer, size_t length) #else +static int sendDataWin(int fd, const char *buffer, int length, int sendBufSize) +{ + int n = 0; + while (n < length) + { + auto toSend = length - n > sendBufSize ? sendBufSize : length - n; + int nWritten = ::send(fd, buffer + n, toSend, 0); + if (nWritten > 0) + { + n += nWritten; + if (nWritten < toSend) + break; + } + else if (nWritten == 0) + { + break; + } + else + { + errno = ::WSAGetLastError(); + break; + } + } + return n; +} ssize_t TcpConnectionImpl::writeRaw(const char *buffer, size_t length) #endif { @@ -735,9 +763,10 @@ ssize_t TcpConnectionImpl::writeRaw(const char *buffer, size_t length) #ifndef _WIN32 int nWritten = write(socketPtr_->fd(), buffer, length); #else - int nWritten = - ::send(socketPtr_->fd(), buffer, static_cast(length), 0); - errno = (nWritten < 0) ? ::WSAGetLastError() : 0; + int nWritten = sendDataWin(socketPtr_->fd(), + buffer, + static_cast(length), + sendBufSize_); #endif if (nWritten > 0) bytesSent_ += nWritten; diff --git a/trantor/net/inner/TcpConnectionImpl.h b/trantor/net/inner/TcpConnectionImpl.h index f19729a0..231fee58 100644 --- a/trantor/net/inner/TcpConnectionImpl.h +++ b/trantor/net/inner/TcpConnectionImpl.h @@ -249,6 +249,7 @@ class TcpConnectionImpl : public TcpConnection, ssize_t writeRaw(const char *buffer, size_t length); // -1: error, 0: EAGAIN, >0: bytes sent ssize_t writeInLoop(const char *buffer, size_t length); + int sendBufSize_; #endif size_t highWaterMarkLen_{0}; std::string name_; From 694c1db62af970a7174a71c0a6b235a88cf6d265 Mon Sep 17 00:00:00 2001 From: an-tao Date: Fri, 11 Oct 2024 18:25:47 +0800 Subject: [PATCH 2/3] Modify high water level callback --- trantor/net/TcpConnection.h | 7 +++- trantor/net/inner/TcpConnectionImpl.cc | 49 +++++++++++++++++--------- trantor/net/inner/TcpConnectionImpl.h | 16 ++++++++- 3 files changed, 54 insertions(+), 18 deletions(-) diff --git a/trantor/net/TcpConnection.h b/trantor/net/TcpConnection.h index 41626bcd..a71e7eff 100644 --- a/trantor/net/TcpConnection.h +++ b/trantor/net/TcpConnection.h @@ -359,7 +359,12 @@ class TRANTOR_EXPORT TcpConnection { sslErrorCallback_ = std::move(cb); } - + /** + * @brief Get the data length in the sending buffer. The sending buffer is + * in the user memory space. + * @note This method should be called in the right event loop. + */ + virtual size_t getBufferedDataLength() const = 0; // TODO: These should be internal APIs virtual void connectEstablished() = 0; virtual void connectDestroyed() = 0; diff --git a/trantor/net/inner/TcpConnectionImpl.cc b/trantor/net/inner/TcpConnectionImpl.cc index f407491e..c15d1607 100644 --- a/trantor/net/inner/TcpConnectionImpl.cc +++ b/trantor/net/inner/TcpConnectionImpl.cc @@ -320,6 +320,18 @@ void TcpConnectionImpl::setTcpNoDelay(bool on) { socketPtr_->setTcpNoDelay(on); } +void TcpConnectionImpl::checkBufferedDataSize() +{ + loop_->assertInLoopThread(); + if (highWaterMarkCallback_) + { + auto bufferedDataSize = getBufferedDataLength(); + if (bufferedDataSize > highWaterMarkLen_) + { + highWaterMarkCallback_(shared_from_this(), bufferedDataSize); + } + } +} void TcpConnectionImpl::connectDestroyed() { loop_->assertInLoopThread(); @@ -415,21 +427,7 @@ void TcpConnectionImpl::sendInLoop(const char *buffer, size_t length) writeBufferList_.back()->append(static_cast(buffer) + sendLen, length); - if (highWaterMarkCallback_ && - writeBufferList_.back()->remainingBytes() > - static_cast(highWaterMarkLen_)) - { - highWaterMarkCallback_(shared_from_this(), - writeBufferList_.back()->remainingBytes()); - } - if (highWaterMarkCallback_ && tlsProviderPtr_ && - tlsProviderPtr_->getBufferedData().readableBytes() > - highWaterMarkLen_) - { - highWaterMarkCallback_( - shared_from_this(), - tlsProviderPtr_->getBufferedData().readableBytes()); - } + checkBufferedDataSize(); } } // The order of data sending should be same as the order of calls of send() @@ -598,12 +596,16 @@ void TcpConnectionImpl::sendFile(BufferNodePtr &&fileNode) { auto n = sendNodeInLoop(fileNode); if (fileNode->remainingBytes() > 0 && n >= 0) + { writeBufferList_.push_back(std::move(fileNode)); + checkBufferedDataSize(); + } return; } else { writeBufferList_.push_back(std::move(fileNode)); + checkBufferedDataSize(); } } else @@ -614,11 +616,15 @@ void TcpConnectionImpl::sendFile(BufferNodePtr &&fileNode) { auto n = thisPtr->sendNodeInLoop(node); if (node->remainingBytes() > 0 && n >= 0) + { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); + } } else { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); } }); } @@ -634,7 +640,10 @@ void TcpConnectionImpl::sendStream( { auto n = sendNodeInLoop(node); if (node->remainingBytes() > 0 && n >= 0) + { writeBufferList_.push_back(std::move(node)); + checkBufferedDataSize(); + } return; } } @@ -647,11 +656,15 @@ void TcpConnectionImpl::sendStream( { auto n = thisPtr->sendNodeInLoop(node); if (node->remainingBytes() > 0 && n >= 0) + { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); + } } else { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); } }); } @@ -959,8 +972,8 @@ AsyncStreamPtr TcpConnectionImpl::sendAsyncStream(bool disableKickoff) idleTimeoutBackup_ = idleTimeout_; idleTimeout_ = 0; } - writeBufferList_.push_back(asyncStreamNode); + checkBufferedDataSize(); } else { @@ -984,11 +997,15 @@ AsyncStreamPtr TcpConnectionImpl::sendAsyncStream(bool disableKickoff) { auto n = thisPtr->sendNodeInLoop(node); if (n >= 0 && (node->remainingBytes() > 0 || node->available())) + { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); + } } else { thisPtr->writeBufferList_.push_back(std::move(node)); + thisPtr->checkBufferedDataSize(); } }); } diff --git a/trantor/net/inner/TcpConnectionImpl.h b/trantor/net/inner/TcpConnectionImpl.h index 231fee58..3a762dc2 100644 --- a/trantor/net/inner/TcpConnectionImpl.h +++ b/trantor/net/inner/TcpConnectionImpl.h @@ -202,10 +202,23 @@ class TcpConnectionImpl : public TcpConnection, idleTimeout_ = timeout; timingWheel->insertEntry(timeout, entry); } + size_t getBufferedDataLength() const override + { + loop_->assertInLoopThread(); + size_t len = 0; + if (tlsProviderPtr_) + { + len += tlsProviderPtr_->getBufferedData().readableBytes(); + } + for (auto &node : writeBufferList_) + { + len += node->remainingBytes(); + } + return len; + } private: /// Internal use only. - std::weak_ptr kickoffEntry_; std::weak_ptr timingWheelWeakPtr_; size_t idleTimeout_{0}; @@ -213,6 +226,7 @@ class TcpConnectionImpl : public TcpConnection, Date lastTimingWheelUpdateTime_; void extendLife(); void sendFile(BufferNodePtr &&fileNode); + void checkBufferedDataSize(); protected: enum class ConnStatus From b6fc6d7ecce0f692970c9998c313ee17db8d942b Mon Sep 17 00:00:00 2001 From: an-tao Date: Fri, 11 Oct 2024 18:33:14 +0800 Subject: [PATCH 3/3] macro --- trantor/net/inner/TcpConnectionImpl.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/trantor/net/inner/TcpConnectionImpl.cc b/trantor/net/inner/TcpConnectionImpl.cc index c15d1607..34fa98b9 100644 --- a/trantor/net/inner/TcpConnectionImpl.cc +++ b/trantor/net/inner/TcpConnectionImpl.cc @@ -83,10 +83,12 @@ TcpConnectionImpl::TcpConnectionImpl(EventLoop *loop, ioChannelPtr_->setErrorCallback([this]() { handleError(); }); socketPtr_->setKeepAlive(true); name_ = localAddr.toIpPort() + "--" + peerAddr.toIpPort(); +#ifdef _WIN32 int size = sizeof(sendBufSize_); ::getsockopt( socketPtr_->fd(), SOL_SOCKET, SO_SNDBUF, (char *)&sendBufSize_, &size); - LOG_TRACE << "Send buffer size: " << sendBufSize_ << " bytes"; + LOG_TRACE << "System sending buffer size: " << sendBufSize_ << " bytes"; +#endif if (policy != nullptr) { tlsProviderPtr_ =