Skip to content

Optimize memory usage when sending data on Windows #359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion trantor/net/TcpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,12 @@ class TRANTOR_EXPORT TcpConnection
{
sslErrorCallback_ = std::move(cb);
}

/**
* @brief Get the data length in the sending buffer. The sending buffer is
* in the user memory space.
* @note This method should be called in the right event loop.
*/
virtual size_t getBufferedDataLength() const = 0;
// TODO: These should be internal APIs
virtual void connectEstablished() = 0;
virtual void connectDestroyed() = 0;
Expand Down
6 changes: 6 additions & 0 deletions trantor/net/inner/StreamBufferNode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ class StreamBufferNode : public BufferNode
{
if (isDone_)
return 0;
if (msgBuffer_.readableBytes() > 0)
return static_cast<long long>(msgBuffer_.readableBytes());
return 1;
}
~StreamBufferNode() override
{
if (streamCallback_)
streamCallback_(nullptr, 0); // cleanup callback internals
}
bool available() const override
{
return !isDone_;
}

private:
std::function<std::size_t(char *, std::size_t)> streamCallback_;
Expand Down
88 changes: 68 additions & 20 deletions trantor/net/inner/TcpConnectionImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ TcpConnectionImpl::TcpConnectionImpl(EventLoop *loop,
ioChannelPtr_->setErrorCallback([this]() { handleError(); });
socketPtr_->setKeepAlive(true);
name_ = localAddr.toIpPort() + "--" + peerAddr.toIpPort();

#ifdef _WIN32
int size = sizeof(sendBufSize_);
::getsockopt(
socketPtr_->fd(), SOL_SOCKET, SO_SNDBUF, (char *)&sendBufSize_, &size);
LOG_TRACE << "System sending buffer size: " << sendBufSize_ << " bytes";
#endif
if (policy != nullptr)
{
tlsProviderPtr_ =
Expand Down Expand Up @@ -317,6 +322,18 @@ void TcpConnectionImpl::setTcpNoDelay(bool on)
{
socketPtr_->setTcpNoDelay(on);
}
void TcpConnectionImpl::checkBufferedDataSize()
{
loop_->assertInLoopThread();
if (highWaterMarkCallback_)
{
auto bufferedDataSize = getBufferedDataLength();
if (bufferedDataSize > highWaterMarkLen_)
{
highWaterMarkCallback_(shared_from_this(), bufferedDataSize);
}
}
}
void TcpConnectionImpl::connectDestroyed()
{
loop_->assertInLoopThread();
Expand Down Expand Up @@ -412,21 +429,7 @@ void TcpConnectionImpl::sendInLoop(const char *buffer, size_t length)
writeBufferList_.back()->append(static_cast<const char *>(buffer) +
sendLen,
length);
if (highWaterMarkCallback_ &&
writeBufferList_.back()->remainingBytes() >
static_cast<long long>(highWaterMarkLen_))
{
highWaterMarkCallback_(shared_from_this(),
writeBufferList_.back()->remainingBytes());
}
if (highWaterMarkCallback_ && tlsProviderPtr_ &&
tlsProviderPtr_->getBufferedData().readableBytes() >
highWaterMarkLen_)
{
highWaterMarkCallback_(
shared_from_this(),
tlsProviderPtr_->getBufferedData().readableBytes());
}
checkBufferedDataSize();
}
}
// The order of data sending should be same as the order of calls of send()
Expand Down Expand Up @@ -595,12 +598,16 @@ void TcpConnectionImpl::sendFile(BufferNodePtr &&fileNode)
{
auto n = sendNodeInLoop(fileNode);
if (fileNode->remainingBytes() > 0 && n >= 0)
{
writeBufferList_.push_back(std::move(fileNode));
checkBufferedDataSize();
}
return;
}
else
{
writeBufferList_.push_back(std::move(fileNode));
checkBufferedDataSize();
}
}
else
Expand All @@ -611,11 +618,15 @@ void TcpConnectionImpl::sendFile(BufferNodePtr &&fileNode)
{
auto n = thisPtr->sendNodeInLoop(node);
if (node->remainingBytes() > 0 && n >= 0)
{
thisPtr->writeBufferList_.push_back(std::move(node));
thisPtr->checkBufferedDataSize();
}
}
else
{
thisPtr->writeBufferList_.push_back(std::move(node));
thisPtr->checkBufferedDataSize();
}
});
}
Expand All @@ -631,7 +642,10 @@ void TcpConnectionImpl::sendStream(
{
auto n = sendNodeInLoop(node);
if (node->remainingBytes() > 0 && n >= 0)
{
writeBufferList_.push_back(std::move(node));
checkBufferedDataSize();
}
return;
}
}
Expand All @@ -644,11 +658,15 @@ void TcpConnectionImpl::sendStream(
{
auto n = thisPtr->sendNodeInLoop(node);
if (node->remainingBytes() > 0 && n >= 0)
{
thisPtr->writeBufferList_.push_back(std::move(node));
thisPtr->checkBufferedDataSize();
}
}
else
{
thisPtr->writeBufferList_.push_back(std::move(node));
thisPtr->checkBufferedDataSize();
}
});
}
Expand Down Expand Up @@ -728,16 +746,42 @@ ssize_t TcpConnectionImpl::sendNodeInLoop(const BufferNodePtr &nodePtr)
#ifndef _WIN32
ssize_t TcpConnectionImpl::writeRaw(const void *buffer, size_t length)
#else
static int sendDataWin(int fd, const char *buffer, int length, int sendBufSize)
{
int n = 0;
Comment on lines 746 to +751
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ifndef is really confusing for no good reason. The function deceleration ssize_t TcpConnectionImpl::writeRaw(const void *buffer, size_t length) is also repeated once at the end of the else clause. We should clean this up.

while (n < length)
{
auto toSend = length - n > sendBufSize ? sendBufSize : length - n;
int nWritten = ::send(fd, buffer + n, toSend, 0);
if (nWritten > 0)
{
n += nWritten;
if (nWritten < toSend)
break;
}
else if (nWritten == 0)
{
break;
}
else
{
errno = ::WSAGetLastError();
break;
}
}
return n;
}
ssize_t TcpConnectionImpl::writeRaw(const char *buffer, size_t length)
#endif
{
// TODO: Abstract this away to support io_uring (and IOCP?)
#ifndef _WIN32
int nWritten = write(socketPtr_->fd(), buffer, length);
#else
int nWritten =
::send(socketPtr_->fd(), buffer, static_cast<int>(length), 0);
errno = (nWritten < 0) ? ::WSAGetLastError() : 0;
int nWritten = sendDataWin(socketPtr_->fd(),
buffer,
static_cast<int>(length),
sendBufSize_);
#endif
if (nWritten > 0)
bytesSent_ += nWritten;
Expand Down Expand Up @@ -930,8 +974,8 @@ AsyncStreamPtr TcpConnectionImpl::sendAsyncStream(bool disableKickoff)
idleTimeoutBackup_ = idleTimeout_;
idleTimeout_ = 0;
}

writeBufferList_.push_back(asyncStreamNode);
checkBufferedDataSize();
}
else
{
Expand All @@ -955,11 +999,15 @@ AsyncStreamPtr TcpConnectionImpl::sendAsyncStream(bool disableKickoff)
{
auto n = thisPtr->sendNodeInLoop(node);
if (n >= 0 && (node->remainingBytes() > 0 || node->available()))
{
thisPtr->writeBufferList_.push_back(std::move(node));
thisPtr->checkBufferedDataSize();
}
}
else
{
thisPtr->writeBufferList_.push_back(std::move(node));
thisPtr->checkBufferedDataSize();
}
});
}
Expand Down
17 changes: 16 additions & 1 deletion trantor/net/inner/TcpConnectionImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,31 @@ class TcpConnectionImpl : public TcpConnection,
idleTimeout_ = timeout;
timingWheel->insertEntry(timeout, entry);
}
size_t getBufferedDataLength() const override
{
loop_->assertInLoopThread();
size_t len = 0;
if (tlsProviderPtr_)
{
len += tlsProviderPtr_->getBufferedData().readableBytes();
}
for (auto &node : writeBufferList_)
{
len += node->remainingBytes();
}
return len;
}

private:
/// Internal use only.

std::weak_ptr<KickoffEntry> kickoffEntry_;
std::weak_ptr<TimingWheel> timingWheelWeakPtr_;
size_t idleTimeout_{0};
size_t idleTimeoutBackup_{0};
Date lastTimingWheelUpdateTime_;
void extendLife();
void sendFile(BufferNodePtr &&fileNode);
void checkBufferedDataSize();

protected:
enum class ConnStatus
Expand Down Expand Up @@ -249,6 +263,7 @@ class TcpConnectionImpl : public TcpConnection,
ssize_t writeRaw(const char *buffer, size_t length);
// -1: error, 0: EAGAIN, >0: bytes sent
ssize_t writeInLoop(const char *buffer, size_t length);
int sendBufSize_;
#endif
size_t highWaterMarkLen_{0};
std::string name_;
Expand Down