Skip to content
This repository was archived by the owner on Mar 25, 2025. It is now read-only.

Commit abacfd0

Browse files
committed
This commit contains a bunch of minor changes.
First it allows a ClientProfile to be updated with new reservation, weight, and limit values by adding an update function. Second it adds an ability to invoke callbacks when a ClientInfo object is removed due to the idle timeout. Testing this functionality has been added to the unit tests. Third we add the ability to clear a heap to help with a more controlled tear-down. Finally, dmclock-servers "cleaning" has been renamed "idle erase" to better indicate the role. Types and variable names have been adjusted accordingly. Signed-off-by: J. Eric Ivancich <[email protected]>
1 parent a994998 commit abacfd0

File tree

3 files changed

+82
-27
lines changed

3 files changed

+82
-27
lines changed

src/dmclock_server.h

Lines changed: 67 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
#include <cmath>
3737
#include <memory>
38+
#include <set>
3839
#include <map>
3940
#include <deque>
4041
#include <queue>
@@ -90,17 +91,18 @@ namespace crimson {
9091
double limit_inv;
9192

9293
// order parameters -- min, "normal", max
93-
ClientInfo(double _reservation, double _weight, double _limit) :
94-
reservation(_reservation),
95-
weight(_weight),
96-
limit(_limit),
97-
reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
98-
weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
99-
limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
100-
{
101-
// empty
94+
ClientInfo(double _reservation, double _weight, double _limit) {
95+
update(_reservation, _weight, _limit);
10296
}
10397

98+
inline void update(double _reservation, double _weight, double _limit) {
99+
reservation = _reservation;
100+
weight = _weight;
101+
limit = _limit;
102+
reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation;
103+
weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight;
104+
limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit;
105+
}
104106

105107
friend std::ostream& operator<<(std::ostream& out,
106108
const ClientInfo& client) {
@@ -771,12 +773,14 @@ namespace crimson {
771773
Duration idle_age;
772774
Duration erase_age;
773775
Duration check_time;
774-
std::deque<MarkPoint> clean_mark_points;
776+
std::deque<MarkPoint> idle_erase_mark_points;
775777

776-
// NB: All threads declared at end, so they're destructed first!
778+
using IdleEraseListener = std::function<void(const C&)>;
779+
std::set<const IdleEraseListener*> idle_erase_listeners;
777780

778-
std::unique_ptr<RunEvery> cleaning_job;
781+
// NB: All threads declared at end, so they're destructed first!
779782

783+
std::unique_ptr<RunEvery> idle_erase_job;
780784

781785
// COMMON constructor that others feed into; we can accept three
782786
// different variations of durations
@@ -797,15 +801,34 @@ namespace crimson {
797801
{
798802
assert(_erase_age >= _idle_age);
799803
assert(_check_time < _idle_age);
800-
cleaning_job =
804+
idle_erase_job =
801805
std::unique_ptr<RunEvery>(
802806
new RunEvery(check_time,
803-
std::bind(&PriorityQueueBase::do_clean, this)));
807+
std::bind(&PriorityQueueBase::do_idle_erase, this)));
804808
}
805809

806810

807811
~PriorityQueueBase() {
808812
finishing = true;
813+
814+
DataGuard g(data_mtx);
815+
816+
ready_heap.clear();
817+
limit_heap.clear();
818+
#if USE_PROP_HEAP
819+
prop_heap.clear();
820+
#endif
821+
resv_heap.clear();
822+
823+
for (auto c = client_map.begin(); client_map.end() != c; /* empty */) {
824+
auto current = c++;
825+
for (auto l : idle_erase_listeners) {
826+
(*l)(current->second->client);
827+
}
828+
client_map.erase(current);
829+
}
830+
831+
idle_erase_listeners.clear();
809832
}
810833

811834

@@ -1021,8 +1044,8 @@ namespace crimson {
10211044
void reduce_reservation_tags(const C& client_id) {
10221045
auto client_it = client_map.find(client_id);
10231046

1024-
// means the client was cleaned from map; should never happen
1025-
// as long as cleaning times are long enough
1047+
// means the client was idle-erased from map; should never
1048+
// happen as long as idle erase times are long enough
10261049
assert(client_map.end() != client_it);
10271050
reduce_reservation_tags(*client_it->second);
10281051
}
@@ -1116,27 +1139,27 @@ namespace crimson {
11161139
* This is being called regularly by RunEvery. Every time it's
11171140
* called it notes the time and delta counter (mark point) in a
11181141
* deque. It also looks at the deque to find the most recent
1119-
* mark point that is older than clean_age. It then walks the
1120-
* map and delete all server entries that were last used before
1121-
* that mark point.
1142+
* mark point that is older than idle-erase age. It then walks
1143+
* the map and delete all server entries that were last used
1144+
* before that mark point.
11221145
*/
1123-
void do_clean() {
1146+
void do_idle_erase() {
11241147
TimePoint now = std::chrono::steady_clock::now();
11251148
DataGuard g(data_mtx);
1126-
clean_mark_points.emplace_back(MarkPoint(now, tick));
1149+
idle_erase_mark_points.emplace_back(MarkPoint(now, tick));
11271150

11281151
// first erase the super-old client records
11291152

11301153
Counter erase_point = 0;
1131-
auto point = clean_mark_points.front();
1154+
auto point = idle_erase_mark_points.front();
11321155
while (point.first <= now - erase_age) {
11331156
erase_point = point.second;
1134-
clean_mark_points.pop_front();
1135-
point = clean_mark_points.front();
1157+
idle_erase_mark_points.pop_front();
1158+
point = idle_erase_mark_points.front();
11361159
}
11371160

11381161
Counter idle_point = 0;
1139-
for (auto i : clean_mark_points) {
1162+
for (auto i : idle_erase_mark_points) {
11401163
if (i.first <= now - idle_age) {
11411164
idle_point = i.second;
11421165
} else {
@@ -1148,14 +1171,17 @@ namespace crimson {
11481171
for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
11491172
auto i2 = i++;
11501173
if (erase_point && i2->second->last_tick <= erase_point) {
1174+
for (auto l : idle_erase_listeners) {
1175+
(*l)(i2->second->client);
1176+
}
11511177
delete_from_heaps(i2->second);
11521178
client_map.erase(i2);
11531179
} else if (idle_point && i2->second->last_tick <= idle_point) {
11541180
i2->second->idle = true;
11551181
}
11561182
} // for
11571183
} // if
1158-
} // do_clean
1184+
} // do_idle_erase
11591185

11601186

11611187
// data_mtx must be held by caller
@@ -1176,6 +1202,21 @@ namespace crimson {
11761202
delete_from_heap(client, limit_heap);
11771203
delete_from_heap(client, ready_heap);
11781204
}
1205+
1206+
public:
1207+
1208+
void add_idle_erase_listener(const IdleEraseListener& listener) {
1209+
DataGuard g(data_mtx);
1210+
if (!finishing) {
1211+
idle_erase_listeners.insert(&listener);
1212+
}
1213+
}
1214+
1215+
1216+
void remove_idle_erase_listener(const IdleEraseListener& listener) {
1217+
DataGuard g(data_mtx);
1218+
idle_erase_listeners.erase(&listener);
1219+
}
11791220
}; // class PriorityQueueBase
11801221

11811222

@@ -1487,7 +1528,6 @@ namespace crimson {
14871528
sched_ahead_thd.join();
14881529
}
14891530

1490-
public:
14911531

14921532
inline void add_request(R&& request,
14931533
const C& client_id,

support/src/indirect_intrusive_heap.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ namespace crimson {
227227

228228
bool empty() const { return 0 == count; }
229229

230+
void clear() { data.clear(); count = 0; }
231+
230232
size_t size() const { return (size_t) count; }
231233

232234
T& top() { return *data[0]; }

test/test_dmclock_server.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <iostream>
1919
#include <list>
2020
#include <vector>
21+
#include <atomic>
2122

2223

2324
#include "dmclock_server.h"
@@ -97,6 +98,7 @@ namespace crimson {
9798
using Queue = dmc::PushPriorityQueue<ClientId,Request>;
9899
int client = 17;
99100
double reservation = 100.0;
101+
std::atomic<std::uint32_t> idle_erase_counter(0u);
100102

101103
dmc::ClientInfo ci(reservation, 1.0, 0.0);
102104
auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
@@ -109,6 +111,9 @@ namespace crimson {
109111
uint64_t req_cost) {
110112
// empty; do nothing
111113
};
114+
auto idle_erase_listener_f = [&idle_erase_counter](const ClientId& c) {
115+
idle_erase_counter++;
116+
};
112117

113118
Queue pq(client_info_f,
114119
server_ready_f,
@@ -117,6 +122,7 @@ namespace crimson {
117122
std::chrono::seconds(5),
118123
std::chrono::seconds(2),
119124
false);
125+
pq.add_idle_erase_listener(idle_erase_listener_f);
120126

121127
auto lock_pq = [&](std::function<void()> code) {
122128
test_locked(pq.data_mtx, code);
@@ -171,12 +177,19 @@ namespace crimson {
171177
"after idle age client map entry shows idle.";
172178
});
173179

180+
EXPECT_EQ(0u, idle_erase_counter) <<
181+
"idle erase counter is still 0 since client has not yet been "
182+
"idle-erased";
183+
174184
std::this_thread::sleep_for(std::chrono::seconds(2));
175185

176186
lock_pq([&] () {
177187
EXPECT_EQ(0u, pq.client_map.size()) <<
178188
"client map loses its entry after erase age";
179189
});
190+
191+
EXPECT_EQ(1u, idle_erase_counter) <<
192+
"idle erase counter is now 1 since client has been idle-erased";
180193
} // TEST
181194

182195

0 commit comments

Comments
 (0)