Skip to content

Simplify the code for sending data #308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Next Next commit
Simplify the code for sending data
  • Loading branch information
an-tao committed Dec 16, 2023
commit 42ba406aea0c0474c924a9a756b117dcbcf8b3ba
9 changes: 8 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ set(TRANTOR_SOURCES
trantor/net/inner/Connector.cc
trantor/net/inner/Poller.cc
trantor/net/inner/Socket.cc
trantor/net/inner/MemBufferNode.cc
trantor/net/inner/StreamBufferNode.cc
trantor/net/inner/TcpConnectionImpl.cc
trantor/net/inner/Timer.cc
trantor/net/inner/TimerQueue.cc
Expand All @@ -126,11 +128,16 @@ if(WIN32)
set(TRANTOR_SOURCES
${TRANTOR_SOURCES}
third_party/wepoll/Wepoll.c
trantor/utils/WindowsSupport.cc)
trantor/utils/WindowsSupport.cc
trantor/net/inner/FileBufferNodeWin.cc)
set(private_headers
${private_headers}
third_party/wepoll/Wepoll.h
trantor/utils/WindowsSupport.h)
else(WIN32)
set(TRANTOR_SOURCES
${TRANTOR_SOURCES}
trantor/net/inner/FileBufferNodeUnix.cc)
endif(WIN32)

# Somehow the default value of TRANTOR_USE_TLS is OFF
Expand Down
73 changes: 73 additions & 0 deletions trantor/net/inner/BufferNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
*
* @file BufferNode.h
* @author An Tao
*
* Public header file in trantor lib.
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/

#pragma once
#ifdef _WIN32
#include <stdio.h>
#endif
#include <trantor/utils/MsgBuffer.h>
#include <trantor/utils/NonCopyable.h>
#include <trantor/utils/Logger.h>
#include <functional>
#include <memory>
#include <string>

namespace trantor
{
class BufferNode;
using BufferNodePtr = std::shared_ptr<BufferNode>;
using StreamCallback = std::function<std::size_t(char *, std::size_t)>;
class BufferNode : public NonCopyable
{
public:
virtual bool isFile() const
{
return false;
}
virtual ~BufferNode() = default;
virtual bool isStream() const
{
return false;
}
virtual void getData(const char *&data, size_t &len) = 0;
virtual void append(const char *data, size_t len)
{
LOG_FATAL << "Not a memory buffer node";
}
virtual void retrieve(size_t len) = 0;
virtual size_t remainingBytes() const = 0;
virtual int getFd() const
{
LOG_FATAL << "Not a file buffer node";
return -1;
}
void done()
{
isDone_ = true;
}
static BufferNodePtr newMemBufferNode();

static BufferNodePtr newStreamBufferNode(StreamCallback &&cb);
#ifdef _WIN32
static BufferNodePtr newFileBufferNode(FILE *fp,
long long offset,
size_t length);
#else
static BufferNodePtr newFileBufferNode(int fd, off_t offset, size_t length);
#endif
protected:
bool isDone_{false};
};

} // namespace trantor
67 changes: 67 additions & 0 deletions trantor/net/inner/FileBufferNodeUnix.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include <trantor/net/inner/BufferNode.h>

namespace trantor
{
static const size_t kMaxSendFileBufferSize = 16 * 1024;
class FileBufferNode : public BufferNode
{
public:
FileBufferNode(int fd, off_t offset, size_t length)
: sendFd_(fd), fileBytesToSend_(length)
{
assert(fd >= 0);
lseek(sendFd_, offset, SEEK_SET);
}
bool isFile() const override
{
return true;
}
int getFd() const override
{
return sendFd_;
}
void getData(const char *&data, size_t &len) override
{
if (msgBuffer_.readableBytes() == 0)
{
msgBuffer_.ensureWritableBytes(
std::min(kMaxSendFileBufferSize, fileBytesToSend_));
auto n = read(sendFd_,
msgBuffer_.beginWrite(),
msgBuffer_.writableBytes());
if (n > 0)
{
msgBuffer_.hasWritten(n);
}
else
{
LOG_SYSERR << "FileBufferNode::getData()";
}
}
data = msgBuffer_.peek();
len = msgBuffer_.readableBytes();
}
void retrieve(size_t len) override
{
msgBuffer_.retrieve(len);
fileBytesToSend_ -= len;
}
size_t remainingBytes() const override
{
if (isDone_)
return 0;
return fileBytesToSend_;
}
~FileBufferNode() override;

private:
int sendFd_{-1};
ssize_t fileBytesToSend_{0};
MsgBuffer msgBuffer_;
};

BufferNodePtr BufferNode::newFileBufferNode(int fd, off_t offset, size_t length)
{
return std::make_shared<FileBufferNode>(fd, offset, length);
}
} // namespace trantor
82 changes: 82 additions & 0 deletions trantor/net/inner/FileBufferNodeWin.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include <trantor/net/inner/BufferNode.h>
namespace trantor
{
static const size_t kMaxSendFileBufferSize = 16 * 1024;
class FileBufferNode : public BufferNode
{
public:
FileBufferNode(FILE *fp, long long offset, size_t length)
: sendFp_(fp), fileBytesToSend_(length)
{
assert(fp);
_fseeki64(sendFp_, offset, SEEK_SET);
}

bool isFile() const override
{
return true;
}

void getData(const char *&data, size_t &len) override
{
if (msgBuffer_.readableBytes() == 0)
{
msgBuffer_.ensureWritableBytes(kMaxSendFileBufferSize <
fileBytesToSend_
? kMaxSendFileBufferSize
: fileBytesToSend_);
auto n = fread(msgBuffer_.beginWrite(),
1,
msgBuffer_.writableBytes(),
sendFp_);
if (n > 0)
{
msgBuffer_.hasWritten(n);
}
else
{
LOG_SYSERR << "FileBufferNode::getData()";
}
}
data = msgBuffer_.peek();
len = msgBuffer_.readableBytes();
}
void retrieve(size_t len) override
{
msgBuffer_.retrieve(len);
fileBytesToSend_ -= len;
}
size_t remainingBytes() const override
{
if (isDone_)
return 0;
return fileBytesToSend_;
}
~FileBufferNode() override
{
if (sendFp_)
{
fclose(sendFp_);
}
}
int getFd() const override
{
LOG_ERROR << "getFd() is not supported on Windows";
return 0;
}

private:
FILE *sendFp_{nullptr};
long long offset_{0};

ssize_t fileBytesToSend_{0};

MsgBuffer msgBuffer_;
};
BufferNodePtr BufferNode::newFileBufferNode(FILE *fp,
long long offset,
size_t length)
{
return std::make_shared<FileBufferNode>(fp, offset, length);
}
} // namespace trantor
36 changes: 36 additions & 0 deletions trantor/net/inner/MemBufferNode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include <trantor/net/inner/BufferNode.h>
namespace trantor
{
class MemBufferNode : public BufferNode
{
public:
MemBufferNode() = default;

void getData(const char *&data, size_t &len) override
{
data = buffer_.peek();
len = buffer_.readableBytes();
}
void retrieve(size_t len) override
{
buffer_.retrieve(len);
}
size_t remainingBytes() const override
{
if (isDone_)
return 0;
return buffer_.readableBytes();
}
void append(const char *data, size_t len) override
{
buffer_.append(data, len);
}

private:
trantor::MsgBuffer buffer_;
};
BufferNodePtr BufferNode::newMemBufferNode()
{
return std::make_shared<MemBufferNode>();
}
} // namespace trantor
67 changes: 67 additions & 0 deletions trantor/net/inner/StreamBufferNode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include <trantor/net/inner/BufferNode.h>
namespace trantor
{
static const size_t kMaxSendFileBufferSize = 16 * 1024;
class StreamBufferNode : public BufferNode
{
public:
StreamBufferNode(std::function<std::size_t(char *, std::size_t)> &&callback)
: streamCallback_(std::move(callback))
{
}
bool isStream() const override
{
return true;
}
void getData(const char *&data, size_t &len) override
{
if (msgBuffer_.readableBytes() == 0)
{
msgBuffer_.ensureWritableBytes(kMaxSendFileBufferSize);
auto n = streamCallback_(msgBuffer_.beginWrite(),
msgBuffer_.writableBytes());
if (n > 0)
{
msgBuffer_.hasWritten(n);
}
else
{
isDone_ = true;
}
}
data = msgBuffer_.peek();
len = msgBuffer_.readableBytes();
}
void retrieve(size_t len) override
{
msgBuffer_.retrieve(len);
#ifndef NDEBUG
dataWritten_ += len;
LOG_TRACE << "send stream in loop: bytes written: " << dataWritten_
<< " / total bytes written: " << dataWritten_;
#endif
}
size_t remainingBytes() const override
{
if (isDone_)
return 0;
return 1;
}
~StreamBufferNode() override
{
if (streamCallback_)
streamCallback_(nullptr, 0); // cleanup callback internals
}

private:
std::function<std::size_t(char *, std::size_t)> streamCallback_;
#ifndef NDEBUG // defined by CMake for release build
std::size_t dataWritten_{0};
#endif
MsgBuffer msgBuffer_;
};
BufferNodePtr BufferNode::newStreamBufferNode(StreamCallback &&callback)
{
return std::make_shared<StreamBufferNode>(std::move(callback));
}
} // namespace trantor
Loading