Skip to content

Simplify the code for sending data #308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
11 changes: 10 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ set(TRANTOR_SOURCES
trantor/net/inner/Connector.cc
trantor/net/inner/Poller.cc
trantor/net/inner/Socket.cc
trantor/net/inner/MemBufferNode.cc
trantor/net/inner/StreamBufferNode.cc
trantor/net/inner/AsyncStreamBufferNode.cc
trantor/net/inner/TcpConnectionImpl.cc
trantor/net/inner/Timer.cc
trantor/net/inner/TimerQueue.cc
Expand All @@ -126,11 +129,16 @@ if(WIN32)
set(TRANTOR_SOURCES
${TRANTOR_SOURCES}
third_party/wepoll/Wepoll.c
trantor/utils/WindowsSupport.cc)
trantor/utils/WindowsSupport.cc
trantor/net/inner/FileBufferNodeWin.cc)
set(private_headers
${private_headers}
third_party/wepoll/Wepoll.h
trantor/utils/WindowsSupport.h)
else(WIN32)
set(TRANTOR_SOURCES
${TRANTOR_SOURCES}
trantor/net/inner/FileBufferNodeUnix.cc)
endif(WIN32)

# Somehow the default value of TRANTOR_USE_TLS is OFF
Expand Down Expand Up @@ -283,6 +291,7 @@ set(public_net_headers
trantor/net/TcpClient.h
trantor/net/TcpConnection.h
trantor/net/TcpServer.h
trantor/net/AsyncStream.h
trantor/net/callbacks.h
trantor/net/Resolver.h
trantor/net/Channel.h
Expand Down
39 changes: 39 additions & 0 deletions trantor/net/AsyncStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
*
* @file AsyncStream.h
* @author An Tao
*
* Public header file in trantor lib.
*
* Copyright 2023, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/

#pragma once

#include <trantor/utils/NonCopyable.h>
#include <memory>

namespace trantor
{
/**
* @brief This class represents a data stream that can be sent asynchronously.
* The data is sent in chunks, and the chunks are sent in order, and all the
* chunks are sent continuously.
*/
class TRANTOR_EXPORT AsyncStream : public NonCopyable
{
public:
virtual ~AsyncStream() = default;
virtual void send(const char *msg, size_t len) = 0;
void send(const std::string &msg)
{
send(msg.data(), msg.length());
}
virtual void close() = 0;
};
using AsyncStreamPtr = std::unique_ptr<AsyncStream>;
} // namespace trantor
8 changes: 8 additions & 0 deletions trantor/net/TcpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <trantor/net/callbacks.h>
#include <trantor/net/Certificate.h>
#include <trantor/net/TLSPolicy.h>
#include <trantor/net/AsyncStream.h>
#include <memory>
#include <functional>
#include <string>
Expand Down Expand Up @@ -95,11 +96,18 @@ class TRANTOR_EXPORT TcpConnection
callback) = 0; // (buffer, buffer size) -> size
// of data put in buffer

/**
* @brief Send a stream to the peer asynchronously.
* @note The subsequent data sent after the async stream will be sent after
* the stream is closed.
*/
virtual AsyncStreamPtr sendAsyncStream() = 0;
/**
* @brief Get the local address of the connection.
*
* @return const InetAddress&
*/

virtual const InetAddress &localAddr() const = 0;

/**
Expand Down
65 changes: 65 additions & 0 deletions trantor/net/inner/AsyncStreamBufferNode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <trantor/net/inner/BufferNode.h>

namespace trantor
{
class AsyncBufferNode : public BufferNode
{
public:
AsyncBufferNode() = default;
~AsyncBufferNode() override = default;
bool isAsync() const override
{
return true;
}
bool isStream() const override
{
return true;
}
size_t remainingBytes() const override
{
if (msgBufferPtr_)
return msgBufferPtr_->readableBytes();
return 0;
}
bool available() const override
{
return !isDone_;
}
void getData(const char *&data, size_t &len) override
{
if (msgBufferPtr_)
{
data = msgBufferPtr_->peek();
len = msgBufferPtr_->readableBytes();
}
else
{
data = nullptr;
len = 0;
}
}
void retrieve(size_t len) override
{
assert(msgBufferPtr_);
if (msgBufferPtr_)
{
msgBufferPtr_->retrieve(len);
}
}
void append(const char *data, size_t len) override
{
if (!msgBufferPtr_)
{
msgBufferPtr_ = std::make_unique<MsgBuffer>(len);
}
msgBufferPtr_->append(data, len);
}

private:
std::unique_ptr<MsgBuffer> msgBufferPtr_;
};
BufferNodePtr BufferNode::newAsyncStreamBufferNode()
{
return std::make_shared<AsyncBufferNode>();
}
} // namespace trantor
86 changes: 86 additions & 0 deletions trantor/net/inner/BufferNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
*
* @file BufferNode.h
* @author An Tao
*
* Public header file in trantor lib.
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/

#pragma once
#ifdef _WIN32
#include <stdio.h>
#endif
#include <trantor/utils/MsgBuffer.h>
#include <trantor/utils/NonCopyable.h>
#include <trantor/utils/Logger.h>
#include <functional>
#include <memory>
#include <string>

namespace trantor
{
class BufferNode;
using BufferNodePtr = std::shared_ptr<BufferNode>;
using StreamCallback = std::function<std::size_t(char *, std::size_t)>;
class BufferNode : public NonCopyable
{
public:
virtual bool isFile() const
{
return false;
}
virtual ~BufferNode() = default;
virtual bool isStream() const
{
return false;
}
virtual void getData(const char *&data, size_t &len) = 0;
virtual void append(const char *, size_t)
{
LOG_FATAL << "Not a memory buffer node";
}
virtual void retrieve(size_t len) = 0;
virtual size_t remainingBytes() const = 0;
virtual int getFd() const
{
LOG_FATAL << "Not a file buffer node";
return -1;
}
virtual bool available() const
{
return true;
}
virtual bool isAsync() const
{
return false;
}

void done()
{
isDone_ = true;
}
static BufferNodePtr newMemBufferNode();

static BufferNodePtr newStreamBufferNode(StreamCallback &&cb);
#ifdef _WIN32
static BufferNodePtr newFileBufferNode(const wchar_t *fileName,
long long offset,
size_t length);
#else
static BufferNodePtr newFileBufferNode(const char *fileName,
off_t offset,
size_t length);
#endif
static BufferNodePtr newAsyncStreamBufferNode();

protected:
bool isDone_{false};
};

} // namespace trantor
Loading