Skip to content

Commit bfda234

Browse files
authored
feat: removed SPSCQueue and made the publishing direct (#46)
* feat: removed SPSCQueue and made the publishing direct * fix: fixed memory leaks
1 parent 8a50e58 commit bfda234

File tree

6 files changed

+130
-354
lines changed

6 files changed

+130
-354
lines changed

CMakeLists.txt

Lines changed: 46 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,35 @@ include(cmake/FetchPackages.cmake)
1818

1919
add_library(shared INTERFACE)
2020

21-
file(GLOB_RECURSE LIBPHONE_SOURCES
22-
source/libphone/*.cpp
23-
)
21+
file(GLOB_RECURSE LIBPHONE_SOURCES source/libphone/*.cpp)
2422
add_library(phone ${LIBPHONE_SOURCES})
2523

26-
file(GLOB_RECURSE MEGAPHONE_SOURCES
27-
source/megaphone/*.cpp
28-
)
24+
file(GLOB_RECURSE MEGAPHONE_SOURCES source/megaphone/*.cpp)
2925
add_executable(megaphone ${MEGAPHONE_SOURCES})
3026

3127
# Set compiler flags
3228
set(COMPILER_FLAGS
3329
# All the warnings
34-
-Wall -Wextra
30+
-Wall
31+
-Wextra
3532
# Except for the ones that aren't mine
36-
-Wno-deprecated -Wno-unused-parameter -Wno-deprecated-declarations
33+
-Wno-deprecated
34+
-Wno-unused-parameter
35+
-Wno-deprecated-declarations
3736
-Wno-macro-redefined
3837
# Stack pointer
39-
-fno-omit-frame-pointer
40-
)
38+
-fno-omit-frame-pointer)
4139

4240
if(MEGAPHONE_ENABLE_ASAN)
43-
list(APPEND COMPILER_FLAGS
44-
# Add asan options
45-
-fsanitize=address -fsanitize=pointer-compare
46-
-fsanitize=pointer-subtract -fsanitize=float-divide-by-zero
47-
-fsanitize=float-cast-overflow
48-
)
41+
list(
42+
APPEND
43+
COMPILER_FLAGS
44+
# Add asan options
45+
-fsanitize=address
46+
-fsanitize=pointer-compare
47+
-fsanitize=pointer-subtract
48+
-fsanitize=float-divide-by-zero
49+
-fsanitize=float-cast-overflow)
4950
endif()
5051

5152
target_compile_options(megaphone PUBLIC ${COMPILER_FLAGS})
@@ -55,48 +56,43 @@ target_compile_options(phone PUBLIC ${COMPILER_FLAGS})
5556
target_link_options(phone PUBLIC ${COMPILER_FLAGS})
5657

5758
# Add include dirs
58-
target_include_directories(shared
59-
INTERFACE source/flatbuffers/generated
60-
INTERFACE source/spscqueue
61-
)
59+
target_include_directories(shared INTERFACE source/flatbuffers/generated)
6260
target_include_directories(megaphone PUBLIC source/megaphone)
6361
target_include_directories(phone PUBLIC source/libphone)
6462

6563
# Link all the libs to each other
66-
target_link_libraries(phone
67-
PUBLIC shared
68-
PUBLIC uWebSockets
69-
PUBLIC spdlog::spdlog
70-
PUBLIC rapidjson
71-
PUBLIC zenohcxx::zenohc::lib
72-
PUBLIC zenohcxx::zenohpico
73-
PUBLIC flatbuffers
74-
)
64+
target_link_libraries(
65+
phone
66+
PUBLIC shared
67+
PUBLIC uWebSockets
68+
PUBLIC spdlog::spdlog
69+
PUBLIC rapidjson
70+
PUBLIC zenohcxx::zenohc::lib
71+
PUBLIC zenohcxx::zenohpico
72+
PUBLIC flatbuffers)
7573
target_link_libraries(megaphone PUBLIC phone)
7674

7775
# Turn clang-tidy off/on
78-
if (USE_CLANG_TIDY)
79-
message("Using clang tidy")
80-
# Find clang tidy
81-
find_program(CLANG_TIDY_EXE NAMES "clang-tidy")
82-
set(CLANG_TIDY_COMMAND "${CLANG_TIDY_EXE}" "-warnings-as-errors=true -checks=*,-llvmlibc-*,-google-*,-fuchsia-*,-android-*,-altera-*,-abseil-*,-boost-*,-objc-*,-openmp-*,-zircon-*,-misc-include-cleaner,-readability-convert-member-functions-to-static")
83-
84-
set_target_properties(megaphone PROPERTIES
85-
CXX_CLANG_TIDY "${CLANG_TIDY_COMMAND}"
86-
)
87-
set_target_properties(phone PROPERTIES
88-
CXX_CLANG_TIDY "${CLANG_TIDY_COMMAND}"
89-
)
76+
if(USE_CLANG_TIDY)
77+
message("Using clang tidy")
78+
# Find clang tidy
79+
find_program(CLANG_TIDY_EXE NAMES "clang-tidy")
80+
set(CLANG_TIDY_COMMAND
81+
"${CLANG_TIDY_EXE}"
82+
"-warnings-as-errors=true -checks=*,-llvmlibc-*,-google-*,-fuchsia-*,-android-*,-altera-*,-abseil-*,-boost-*,-objc-*,-openmp-*,-zircon-*,-misc-include-cleaner,-readability-convert-member-functions-to-static"
83+
)
84+
85+
set_target_properties(megaphone PROPERTIES CXX_CLANG_TIDY
86+
"${CLANG_TIDY_COMMAND}")
87+
set_target_properties(phone PROPERTIES CXX_CLANG_TIDY "${CLANG_TIDY_COMMAND}")
9088
endif()
9189

9290
# Tests
93-
if (MEGAPHONE_ENABLE_TESTS)
94-
file(GLOB_RECURSE TESTS_SOURCES
95-
tests/*.cpp
96-
)
97-
add_executable(phonetests ${TESTS_SOURCES})
98-
target_link_libraries(phonetests
99-
PRIVATE Catch2::Catch2WithMain
100-
PRIVATE phone
101-
)
91+
if(MEGAPHONE_ENABLE_TESTS)
92+
file(GLOB_RECURSE TESTS_SOURCES tests/*.cpp)
93+
add_executable(phonetests ${TESTS_SOURCES})
94+
target_link_libraries(
95+
phonetests
96+
PRIVATE Catch2::Catch2WithMain
97+
PRIVATE phone)
10298
endif()

source/libphone/fbhandler/fbhandler.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ class FBHandler {
4141
auto handle_invalid_buffer = [&]() { return "Invalid buffer received, PLEASE REPORT THIS to the provider"; };
4242

4343
auto process_valid_buffer = [&](auto&& data_flatbuf) {
44-
IterateFlatBuffer(buf, type_table, &flatbuf_visitor);
45-
return flatbuf_visitor.s;
44+
return flatbuffers::FlatBufferToString(buf, type_table);
4645
};
4746

4847
if constexpr (std::is_same_v<flat_buf_type, Bitwyre::Flatbuffers::Depthl2::DepthEvent>) {

source/libphone/libphone.cpp

Lines changed: 67 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,60 @@
1313

1414
namespace LibPhone {
1515

16-
Phone::Phone(zenohc::Session& session)
16+
Phone::Phone()
1717
: m_app(uWSAppWrapper({.passphrase = utils::ENVManager::get_instance().get_megaphone_uws_passphrase().c_str()})),
1818
m_supported_instruments(utils::ENVManager::get_instance().get_megaphone_supported_instruments()),
1919
m_serializer(),
20-
m_fbhandler(),
2120
m_zenoh_subscriber(nullptr) {
21+
2222
SPDLOG_INFO("Supported instruments:");
2323
for (auto& instrument : this->m_supported_instruments) {
2424
SPDLOG_INFO("\t{}", instrument);
2525
}
2626

27+
this->m_app.ws<PerSocketData>(
28+
"/*",
29+
{.compression = uWS::DISABLED,
30+
.maxPayloadLength = 16 * 2048 * 2048,
31+
.idleTimeout = 960,
32+
.maxBackpressure = 16 * 2048 * 2048,
33+
.closeOnBackpressureLimit = false,
34+
.resetIdleTimeoutOnSend = false,
35+
.sendPingsAutomatically = false,
36+
/* Handlers */
37+
.upgrade = nullptr,
38+
.open = [this](auto* ws) { this->on_ws_open(ws); },
39+
.message = [this](auto* ws, std::string_view message,
40+
uWS::OpCode opCode) { this->on_ws_message(ws, message, opCode); },
41+
.drain = nullptr,
42+
.ping = [this](auto* ws, std::string_view message) { this->on_ws_ping(ws, message); },
43+
.pong = [this](auto* ws, std::string_view message) { this->on_ws_pong(ws, message); },
44+
.close = [this](auto* ws, int code, std::string_view message) { this->on_ws_close(ws, code, message); }});
45+
46+
this->m_loop = uWS::Loop::get();
47+
48+
auto* loop_t = reinterpret_cast<struct us_loop_t*>(this->m_loop);
49+
auto* delay_timer = us_create_timer(loop_t, 0, 0);
50+
us_timer_set(
51+
delay_timer, [](struct us_timer_t*) {}, 1, 1);
52+
53+
this->m_app.listen(PORT, [](auto* listen_socket) {
54+
if (listen_socket) {
55+
SPDLOG_INFO("Listening on port {}", PORT);
56+
}
57+
});
58+
};
59+
60+
Phone::~Phone() { }
61+
62+
auto Phone::run(zenohc::Session& session) -> void {
63+
2764
this->m_zenoh_subscriber = zenohc::expect<zenohc::Subscriber>(
2865
session.declare_subscriber("bitwyre/megaphone/websockets", [&](const zenohc::Sample& sample) {
29-
std::string encoding {sample.get_encoding().get_suffix().as_string_view()};
30-
std::string datacopy {sample.get_payload().as_string_view()};
3166
std::string data {};
3267
std::string instrument {};
68+
std::string encoding {sample.get_encoding().get_suffix().as_string_view()};
69+
std::string datacopy {reinterpret_cast<const char*>(sample.get_payload().start), sample.get_payload().len};
3370

3471
std::transform(encoding.begin(), encoding.end(), encoding.begin(),
3572
[](unsigned char c) { return std::tolower(c); });
@@ -73,76 +110,39 @@ Phone::Phone(zenohc::Session& session)
73110

74111
SPDLOG_INFO("Event type: {}\n\tInstrument: {}\n\tData: {}", encoding, instrument, data);
75112

76-
this->m_zenoh_queue.push(MEMessage {encoding, instrument, data});
113+
publish_result(MEMessage {encoding, instrument, data});
77114
}));
78115

79-
this->m_app.ws<PerSocketData>(
80-
"/*",
81-
{.compression = uWS::DISABLED,
82-
.maxPayloadLength = 16 * 1024 * 1024,
83-
.idleTimeout = 960,
84-
.maxBackpressure = 1 * 1024 * 1024,
85-
.closeOnBackpressureLimit = false,
86-
.resetIdleTimeoutOnSend = false,
87-
.sendPingsAutomatically = false,
88-
/* Handlers */
89-
.upgrade = nullptr,
90-
.open = [this](auto* ws) { this->on_ws_open(ws); },
91-
.message = [this](auto* ws, std::string_view message,
92-
uWS::OpCode opCode) { this->on_ws_message(ws, message, opCode); },
93-
.drain = [this](auto* ws) { this->on_ws_drain(ws); },
94-
.ping = [this](auto* ws, std::string_view message) { this->on_ws_ping(ws, message); },
95-
.pong = [this](auto* ws, std::string_view message) { this->on_ws_pong(ws, message); },
96-
.close = [this](auto* ws, int code, std::string_view message) { this->on_ws_close(ws, code, message); }});
97-
98-
auto* loop = uWS::Loop::get();
116+
this->m_app.run();
117+
}
99118

100-
auto* loop_t = reinterpret_cast<struct us_loop_t*>(loop);
101-
auto* delay_timer = us_create_timer(loop_t, 0, 0);
102-
us_timer_set(
103-
delay_timer, [](struct us_timer_t*) {}, 1, 1);
119+
auto Phone::publish_result(MEMessage&& item) -> void {
104120

105-
loop->addPostHandler(nullptr, [this](uWS::Loop* p_loop) {
106-
p_loop->defer([this]() {
107-
if (this->m_zenoh_queue.front() != nullptr) {
121+
this->m_loop->defer([=]() {
122+
// This needs to be done as the publish function takes in a string view, ie, a reference.
123+
const auto data_copy {item.data};
124+
const auto topic = item.msg_type + ':' + item.instrument;
108125

109-
publish_result(std::move(*this->m_zenoh_queue.front()));
110-
this->m_zenoh_queue.pop();
126+
// Publish to the global ticker
127+
if (item.msg_type == "ticker") {
128+
if (!this->m_app.publish(item.msg_type, data_copy, uWS::OpCode::TEXT, false)) {
129+
// This log is debug as publish fails if the topic doesn't exist for the user
130+
// that results in a lot of false-positive error logs.
131+
SPDLOG_DEBUG("Failed to publish to topic: {}", topic);
111132
}
112-
});
113-
});
114-
115-
this->m_app.listen(PORT, [](auto* listen_socket) {
116-
if (listen_socket) {
117-
SPDLOG_INFO("Listening on port {}", PORT);
118133
}
119-
});
120-
};
121134

122-
Phone::~Phone() { }
123-
124-
auto Phone::run() -> void { this->m_app.run(); }
125-
126-
auto Phone::publish_result(LibPhone::MEMessage&& item) noexcept -> void {
127-
auto topic = item.msg_type + ':' + item.instrument;
128-
129-
// Publish to the global ticker
130-
if (item.msg_type == "ticker") {
131-
if (!this->m_app.publish(item.msg_type, item.data, uWS::OpCode::TEXT, false)) {
132-
// This log is debug as publish fails if the topic doesn't exist for the user
133-
// that results in a lot of false-positive error logs.
135+
// as well as the global ticker
136+
// TODO: Make a separate thread for each type of event
137+
if (!this->m_app.publish(topic, item.data, uWS::OpCode::TEXT, false)) {
134138
SPDLOG_DEBUG("Failed to publish to topic: {}", topic);
135139
}
136-
}
140+
});
137141

138-
// as well as the global ticker
139-
// TODO: Make a separate thread for each type of event
140-
if (!this->m_app.publish(topic, item.data, uWS::OpCode::TEXT, false)) {
141-
SPDLOG_DEBUG("Failed to publish to topic: {}", topic);
142-
}
142+
return;
143143
};
144144

145-
auto Phone::on_ws_open(uWSWebSocket* ws) noexcept -> void {
145+
auto Phone::on_ws_open(uWSWebSocket* ws) -> void {
146146
/* Open event here, you may access ws->getUserData() which points to a
147147
* PerSocketData struct */
148148
PerSocketData* perSocketData = (PerSocketData*)ws->getUserData();
@@ -155,17 +155,15 @@ auto Phone::on_ws_open(uWSWebSocket* ws) noexcept -> void {
155155
"subscribe to a channel.");
156156
}
157157

158-
auto Phone::on_ws_message(uWSWebSocket* ws, std::string_view message, uWS::OpCode opCode) noexcept -> void {
158+
auto Phone::on_ws_message(uWSWebSocket* ws, std::string_view message, uWS::OpCode opCode) -> void {
159159
PerSocketData* perSocketData = (PerSocketData*)ws->getUserData();
160160

161161
const auto& [success, req] = this->m_serializer.parse_request(message);
162162

163163
if (success) {
164164

165-
perSocketData->topics = std::move(req.topics);
166-
167165
SPDLOG_INFO("user {} has subscribed to topics: ", perSocketData->user);
168-
for (auto& topic : perSocketData->topics) {
166+
for (auto& topic : req.topics) {
169167
if (ws->subscribe(topic)) {
170168
SPDLOG_INFO("\t - {}", topic);
171169
} else {
@@ -179,12 +177,12 @@ auto Phone::on_ws_message(uWSWebSocket* ws, std::string_view message, uWS::OpCod
179177
}
180178
}
181179

182-
auto Phone::on_ws_drain(uWSWebSocket* ws) noexcept -> void { }
180+
auto Phone::on_ws_drain(uWSWebSocket* ws) -> void { }
183181

184-
auto Phone::on_ws_ping(uWSWebSocket* ws, std::string_view message) noexcept -> void { }
182+
auto Phone::on_ws_ping(uWSWebSocket* ws, std::string_view message) -> void { }
185183

186-
auto Phone::on_ws_pong(uWSWebSocket* ws, std::string_view message) noexcept -> void { }
184+
auto Phone::on_ws_pong(uWSWebSocket* ws, std::string_view message) -> void { }
187185

188-
auto Phone::on_ws_close(uWSWebSocket* ws, int code, std::string_view message) noexcept -> void { }
186+
auto Phone::on_ws_close(uWSWebSocket* ws, int code, std::string_view message) -> void { }
189187

190188
} // namespace LibPhone

0 commit comments

Comments
 (0)