Skip to content

Commit b27f152

Browse files
GPINs Teamcopybara-github
GPINs Team
authored andcommitted
Adding stats collection
This changelist adds methods to record essential stats for each gRPC connection and expose them. PiperOrigin-RevId: 615700708
1 parent 72a2421 commit b27f152

File tree

3 files changed

+110
-1
lines changed

3 files changed

+110
-1
lines changed

server/gnpsi_service_impl.cc

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,60 @@
22

33
#include <memory>
44
#include <string>
5+
#include <vector>
56

67
#include "absl/base/thread_annotations.h"
78
#include "glog/logging.h"
89
#include "absl/status/status.h"
10+
#include "absl/strings/match.h"
11+
#include "absl/strings/numbers.h"
912
#include "absl/strings/str_cat.h"
13+
#include "absl/strings/string_view.h"
14+
#include "absl/strings/strip.h"
1015
#include "absl/synchronization/mutex.h"
1116
#include "absl/time/clock.h"
1217
#include "absl/time/time.h"
1318
#include "proto/gnpsi/gnpsi.pb.h"
1419

1520
namespace gnpsi {
1621

22+
absl::Status GnpsiConnection::InitializeStats() {
23+
std::string uri = this->GetPeerName();
24+
LOG(INFO) << "uri: " << uri;
25+
absl::string_view ip;
26+
int port;
27+
if (absl::StartsWith(uri, kIpv4Indicator)) {
28+
absl::string_view path =
29+
absl::StripPrefix(uri, absl::StrCat(kIpv4Indicator, ":"));
30+
if (int colon = path.find(':'); colon != absl::string_view::npos) {
31+
ip = path.substr(0, colon);
32+
if (!absl::SimpleAtoi(path.substr(colon + 1), &port)) {
33+
return absl::InternalError(
34+
"Error retrieving port information from uri string");
35+
}
36+
this->stats_ = GnpsiStats(ip, port);
37+
return absl::OkStatus();
38+
}
39+
return absl::NotFoundError("Port not found in uri string");
40+
}
41+
if (absl::StartsWith(uri, kIpv6Indicator)) {
42+
absl::string_view path =
43+
absl::StripPrefix(uri, absl::StrCat(kIpv6Indicator, ":%5B"));
44+
if (int closing_bracket = path.find("%5D");
45+
closing_bracket != absl::string_view::npos) {
46+
ip = path.substr(0, closing_bracket);
47+
if (!absl::SimpleAtoi(path.substr(closing_bracket + 4), &port)) {
48+
return absl::InternalError(
49+
"Error retrieving port information from uri string");
50+
}
51+
this->stats_ = GnpsiStats(ip, port);
52+
return absl::OkStatus();
53+
}
54+
return absl::NotFoundError("Port not found in uri string");
55+
}
56+
return absl::InvalidArgumentError("The passed URI format is not supported");
57+
}
58+
1759
void GnpsiConnection::WaitUntilClosed() {
1860
absl::MutexLock l(&mu_);
1961
auto stream_disconnected = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
@@ -71,6 +113,10 @@ absl::Status GnpsiServiceImpl::AddConnection(GnpsiConnection* connection) {
71113
client_max_number_, " clients."));
72114
}
73115
LOG(INFO) << "Add " << connection->GetPeerName() << " to client list.";
116+
if (absl::Status status = connection->InitializeStats(); !status.ok()) {
117+
LOG(ERROR) << "Error while creating stats object for peer - "
118+
<< status.message();
119+
}
74120
gnpsi_connections_.push_back(connection);
75121
return absl::OkStatus();
76122
}
@@ -113,13 +159,25 @@ void GnpsiServiceImpl::SendSamplePacket(
113159
connection->SendResponse(response)) {
114160
VLOG(1) << "Successfully sent sample packet to "
115161
<< connection->GetPeerName() << ".";
162+
connection->IncrementDatagramCount();
163+
connection->IncrementBytesSampled(sample_packet.size());
116164
} else {
117165
// If it fails to send response to any client, close this connection.
118166
LOG(ERROR) << "Failed to send sample packet to "
119167
<< connection->GetPeerName() << ".";
168+
connection->IncrementWriteErrorCount();
120169
connection->CloseStream();
121170
}
122171
}
123172
}
124173

174+
std::vector<GnpsiStats> GnpsiServiceImpl::GetStats() {
175+
absl::MutexLock l(&mu_);
176+
std::vector<GnpsiStats> stats;
177+
for (auto it = gnpsi_connections_.begin(), end = gnpsi_connections_.end();
178+
it != end; it++) {
179+
stats.push_back((*it)->GetConnectionStats());
180+
}
181+
return stats;
182+
}
125183
} // namespace gnpsi

server/gnpsi_service_impl.h

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#ifndef OPENCONFIG_GNPSI_SERVER_GNPSI_SERVICE_IMPL_H_
22
#define OPENCONFIG_GNPSI_SERVER_GNPSI_SERVICE_IMPL_H_
33

4+
#include <cstdint>
45
#include <string>
56
#include <vector>
67

8+
#include "absl/status/status.h"
79
#include "absl/synchronization/mutex.h"
810
#include "grpcpp/server_context.h"
911
#include "grpcpp/support/status.h"
@@ -18,6 +20,25 @@ using ::grpc::ServerWriterInterface;
1820
using ::grpc::Status;
1921
using ::grpc::StatusCode;
2022

23+
inline constexpr absl::string_view kIpv4Indicator = "ipv4";
24+
inline constexpr absl::string_view kIpv6Indicator = "ipv6";
25+
26+
struct GnpsiStats {
27+
GnpsiStats() : datagram_count(0), bytes_sampled(0), error_count(0) {}
28+
GnpsiStats(absl::string_view ip, int port)
29+
: collector_ip(ip),
30+
collector_port(port),
31+
datagram_count(0),
32+
bytes_sampled(0),
33+
error_count(0) {}
34+
35+
std::string collector_ip;
36+
int collector_port;
37+
int datagram_count;
38+
uint64_t bytes_sampled;
39+
int error_count;
40+
};
41+
2142
// Interface to gNPSI sender method
2243
class GnpsiSenderInterface {
2344
public:
@@ -26,6 +47,7 @@ class GnpsiSenderInterface {
2647
const std::string& sample_packet,
2748
SFlowMetadata::Version version = SFlowMetadata::V5) = 0;
2849
virtual void DrainConnections() = 0;
50+
virtual std::vector<GnpsiStats> GetStats() = 0;
2951
};
3052

3153
// A connection between a client and gNPSI server. This class is thread-safe.
@@ -35,7 +57,9 @@ class GnpsiConnection {
3557
ServerWriterInterface<Sample>* writer)
3658
: context_(context), writer_(writer), is_stream_closed_(false) {}
3759

38-
std::string GetPeerName() const { return context_->peer(); }
60+
virtual ~GnpsiConnection() = default;
61+
62+
virtual std::string GetPeerName() const { return context_->peer(); }
3963

4064
bool IsContextCancelled() const { return context_->IsCancelled(); }
4165

@@ -52,13 +76,35 @@ class GnpsiConnection {
5276
// or context is cancelled.
5377
void WaitUntilClosed() ABSL_LOCKS_EXCLUDED(mu_);
5478

79+
// Initialize the stats object with the required connection related values.
80+
// IPV4 Uri = ipv4:a.b.c.d:e will set connection_ip = a.b.c.d and
81+
// connection_port = e. IPV6 Uri = ipv6:[a:b:c:d::]:e will set connection_ip =
82+
// a:b:c:d:: and connection_port = e.
83+
absl::Status InitializeStats();
84+
85+
// Increment the count of datagram sent to this connection by 1.
86+
void IncrementDatagramCount() { this->stats_.datagram_count++; }
87+
88+
// Increment the bytes sampled by size of packet.
89+
void IncrementBytesSampled(uint64_t packet_size) {
90+
this->stats_.bytes_sampled += packet_size;
91+
}
92+
93+
// Increment the count of write errors for this connection by 1.
94+
void IncrementWriteErrorCount() { this->stats_.error_count++; }
95+
96+
// Returns the number of datagrams sent to this connection.
97+
GnpsiStats GetConnectionStats() { return this->stats_; }
98+
5599
private:
56100
ServerContext* context_;
57101
ServerWriterInterface<::gnpsi::Sample>* writer_;
58102
// Lock for protecting is_stream_closed_.
59103
absl::Mutex mu_;
60104
// When set to true, it means stream is broken.
61105
bool is_stream_closed_ ABSL_GUARDED_BY(mu_);
106+
// Maintains the stats for this connections.
107+
GnpsiStats stats_;
62108
};
63109

64110
// Implementation of gNPSI server.
@@ -85,6 +131,9 @@ class GnpsiServiceImpl : public ::gnpsi::gNPSI::Service,
85131
// Closes all current conections and blocks any new incoming connections.
86132
void DrainConnections() ABSL_LOCKS_EXCLUDED(mu_) override;
87133

134+
// Returns stats per connection collected by the server.
135+
std::vector<GnpsiStats> GetStats() ABSL_LOCKS_EXCLUDED(mu_) override;
136+
88137
private:
89138
int client_max_number_;
90139
// Lock for protecting member fields.

server/mock_gnpsi_service_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef OPENCONFIG_GNPSI_SERVER_MOCK_GNPSI_SERVICE_IMPL_H_
22
#define OPENCONFIG_GNPSI_SERVER_MOCK_GNPSI_SERVICE_IMPL_H_
33

4+
#include <vector>
45
#include "gmock/gmock.h"
56
#include "server/gnpsi_service_impl.h"
67

@@ -13,6 +14,7 @@ class MockGnpsiServiceImpl : public GnpsiSenderInterface {
1314
SFlowMetadata::Version version),
1415
(override));
1516
MOCK_METHOD(void, DrainConnections, (), (override));
17+
MOCK_METHOD(std::vector<GnpsiStats>, GetStats, (), (override));
1618
};
1719
} // namespace gnpsi
1820

0 commit comments

Comments
 (0)