Skip to content

Commit 49b75e9

Browse files
authored
Consolidate operations on LocalNode. (livekit#3140)
1 parent d341ee1 commit 49b75e9

File tree

15 files changed

+179
-93
lines changed

15 files changed

+179
-93
lines changed

cmd/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func startServer(c *cli.Context) error {
272272
return err
273273
}
274274

275-
if err := prometheus.Init(currentNode.Id, currentNode.Type); err != nil {
275+
if err := prometheus.Init(string(currentNode.NodeID()), currentNode.NodeType()); err != nil {
276276
return err
277277
}
278278

pkg/agent/testutils/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/livekit/livekit-server/pkg/agent"
1717
"github.com/livekit/livekit-server/pkg/config"
18+
"github.com/livekit/livekit-server/pkg/routing"
1819
"github.com/livekit/livekit-server/pkg/service"
1920
"github.com/livekit/protocol/auth"
2021
"github.com/livekit/protocol/livekit"
@@ -35,9 +36,10 @@ type TestServer struct {
3536
}
3637

3738
func NewTestServer(bus psrpc.MessageBus) *TestServer {
39+
localNode, _ := routing.NewLocalNode(nil)
3840
return NewTestServerWithService(must.Get(service.NewAgentService(
3941
&config.Config{Region: "test"},
40-
&livekit.Node{Id: guid.New("N_")},
42+
localNode,
4143
bus,
4244
auth.NewSimpleKeyProvider("test", "verysecretsecret"),
4345
)))

pkg/routing/localrouter.go

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323

2424
"github.com/livekit/protocol/livekit"
2525
"github.com/livekit/protocol/logger"
26-
"github.com/livekit/protocol/utils"
2726
)
2827

2928
var _ Router = (*LocalRouter)(nil)
@@ -56,10 +55,7 @@ func NewLocalRouter(
5655
}
5756

5857
func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error) {
59-
r.lock.Lock()
60-
defer r.lock.Unlock()
61-
node := utils.CloneProto((*livekit.Node)(r.currentNode))
62-
return node, nil
58+
return r.currentNode.Clone(), nil
6359
}
6460

6561
func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error {
@@ -83,28 +79,28 @@ func (r *LocalRouter) RemoveDeadNodes() error {
8379
}
8480

8581
func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error) {
86-
if nodeID == livekit.NodeID(r.currentNode.Id) {
87-
return r.currentNode, nil
82+
if nodeID == r.currentNode.NodeID() {
83+
return r.currentNode.Clone(), nil
8884
}
8985
return nil, ErrNotFound
9086
}
9187

9288
func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) {
9389
return []*livekit.Node{
94-
r.currentNode,
90+
r.currentNode.Clone(),
9591
}, nil
9692
}
9793

9894
func (r *LocalRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error) {
99-
return r.CreateRoomWithNodeID(ctx, req, livekit.NodeID(r.currentNode.Id))
95+
return r.CreateRoomWithNodeID(ctx, req, r.currentNode.NodeID())
10096
}
10197

10298
func (r *LocalRouter) CreateRoomWithNodeID(ctx context.Context, req *livekit.CreateRoomRequest, nodeID livekit.NodeID) (res *livekit.Room, err error) {
10399
return r.roomManagerClient.CreateRoom(ctx, nodeID, req)
104100
}
105101

106102
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error) {
107-
return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(r.currentNode.Id))
103+
return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, r.currentNode.NodeID())
108104
}
109105

110106
func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error) {
@@ -136,15 +132,13 @@ func (r *LocalRouter) Start() error {
136132
}
137133

138134
func (r *LocalRouter) Drain() {
139-
r.lock.Lock()
140-
defer r.lock.Unlock()
141-
r.currentNode.State = livekit.NodeState_SHUTTING_DOWN
135+
r.currentNode.SetState(livekit.NodeState_SHUTTING_DOWN)
142136
}
143137

144138
func (r *LocalRouter) Stop() {}
145139

146140
func (r *LocalRouter) GetRegion() string {
147-
return r.currentNode.Region
141+
return r.currentNode.Region()
148142
}
149143

150144
func (r *LocalRouter) statsWorker() {
@@ -154,9 +148,7 @@ func (r *LocalRouter) statsWorker() {
154148
}
155149
// update every 10 seconds
156150
<-time.After(statsUpdateInterval)
157-
r.lock.Lock()
158-
r.currentNode.Stats.UpdatedAt = time.Now().Unix()
159-
r.lock.Unlock()
151+
r.currentNode.UpdateNodeStats()
160152
}
161153
}
162154

pkg/routing/node.go

Lines changed: 126 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,146 @@ package routing
1616

1717
import (
1818
"runtime"
19+
"sync"
1920
"time"
2021

2122
"github.com/livekit/protocol/livekit"
23+
"github.com/livekit/protocol/logger"
2224
"github.com/livekit/protocol/utils"
2325
"github.com/livekit/protocol/utils/guid"
2426

2527
"github.com/livekit/livekit-server/pkg/config"
28+
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
2629
)
2730

28-
type LocalNode *livekit.Node
31+
type LocalNode interface {
32+
Clone() *livekit.Node
33+
SetNodeID(nodeID livekit.NodeID)
34+
NodeID() livekit.NodeID
35+
NodeType() livekit.NodeType
36+
NodeIP() string
37+
Region() string
38+
SetState(state livekit.NodeState)
39+
SetStats(stats *livekit.NodeStats)
40+
UpdateNodeStats() bool
41+
SecondsSinceNodeStatsUpdate() float64
42+
}
43+
44+
type LocalNodeImpl struct {
45+
lock sync.RWMutex
46+
node *livekit.Node
2947

30-
func NewLocalNode(conf *config.Config) (LocalNode, error) {
48+
// previous stats for computing averages
49+
prevStats *livekit.NodeStats
50+
}
51+
52+
func NewLocalNode(conf *config.Config) (*LocalNodeImpl, error) {
3153
nodeID := guid.New(utils.NodePrefix)
32-
if conf.RTC.NodeIP == "" {
54+
if conf != nil && conf.RTC.NodeIP == "" {
3355
return nil, ErrIPNotSet
3456
}
35-
node := &livekit.Node{
36-
Id: nodeID,
37-
Ip: conf.RTC.NodeIP,
38-
NumCpus: uint32(runtime.NumCPU()),
39-
Region: conf.Region,
40-
State: livekit.NodeState_SERVING,
41-
Stats: &livekit.NodeStats{
42-
StartedAt: time.Now().Unix(),
43-
UpdatedAt: time.Now().Unix(),
57+
l := &LocalNodeImpl{
58+
node: &livekit.Node{
59+
Id: nodeID,
60+
NumCpus: uint32(runtime.NumCPU()),
61+
State: livekit.NodeState_SERVING,
62+
Stats: &livekit.NodeStats{
63+
StartedAt: time.Now().Unix(),
64+
UpdatedAt: time.Now().Unix(),
65+
},
4466
},
4567
}
68+
if conf != nil {
69+
l.node.Ip = conf.RTC.NodeIP
70+
l.node.Region = conf.Region
71+
}
72+
return l, nil
73+
}
74+
75+
func NewLocalNodeFromNodeProto(node *livekit.Node) (*LocalNodeImpl, error) {
76+
return &LocalNodeImpl{node: utils.CloneProto(node)}, nil
77+
}
78+
79+
func (l *LocalNodeImpl) Clone() *livekit.Node {
80+
l.lock.RLock()
81+
defer l.lock.RUnlock()
82+
83+
return utils.CloneProto(l.node)
84+
}
85+
86+
// for testing only
87+
func (l *LocalNodeImpl) SetNodeID(nodeID livekit.NodeID) {
88+
l.lock.Lock()
89+
defer l.lock.Unlock()
90+
91+
l.node.Id = string(nodeID)
92+
}
93+
94+
func (l *LocalNodeImpl) NodeID() livekit.NodeID {
95+
l.lock.RLock()
96+
defer l.lock.RUnlock()
97+
98+
return livekit.NodeID(l.node.Id)
99+
}
100+
101+
func (l *LocalNodeImpl) NodeType() livekit.NodeType {
102+
l.lock.RLock()
103+
defer l.lock.RUnlock()
104+
105+
return l.node.Type
106+
}
107+
108+
func (l *LocalNodeImpl) NodeIP() string {
109+
l.lock.RLock()
110+
defer l.lock.RUnlock()
111+
112+
return l.node.Ip
113+
}
114+
115+
func (l *LocalNodeImpl) Region() string {
116+
l.lock.RLock()
117+
defer l.lock.RUnlock()
118+
119+
return l.node.Region
120+
}
121+
122+
func (l *LocalNodeImpl) SetState(state livekit.NodeState) {
123+
l.lock.Lock()
124+
defer l.lock.Unlock()
125+
126+
l.node.State = state
127+
}
128+
129+
// for testing only
130+
func (l *LocalNodeImpl) SetStats(stats *livekit.NodeStats) {
131+
l.lock.Lock()
132+
defer l.lock.Unlock()
133+
134+
l.node.Stats = utils.CloneProto(stats)
135+
}
136+
137+
func (l *LocalNodeImpl) UpdateNodeStats() bool {
138+
l.lock.Lock()
139+
defer l.lock.Unlock()
140+
141+
if l.prevStats == nil {
142+
l.prevStats = l.node.Stats
143+
}
144+
updated, computedAvg, err := prometheus.GetUpdatedNodeStats(l.node.Stats, l.prevStats)
145+
if err != nil {
146+
logger.Errorw("could not update node stats", err)
147+
return false
148+
}
149+
l.node.Stats = updated
150+
if computedAvg {
151+
l.prevStats = updated
152+
}
153+
return true
154+
}
155+
156+
func (l *LocalNodeImpl) SecondsSinceNodeStatsUpdate() float64 {
157+
l.lock.RLock()
158+
defer l.lock.RUnlock()
46159

47-
return node, nil
160+
return time.Since(time.Unix(0, l.node.Stats.UpdatedAt)).Seconds()
48161
}

pkg/routing/redisrouter.go

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"bytes"
1919
"context"
2020
"runtime/pprof"
21-
"sync"
2221
"time"
2322

2423
"github.com/pkg/errors"
@@ -31,14 +30,13 @@ import (
3130
"github.com/livekit/protocol/rpc"
3231

3332
"github.com/livekit/livekit-server/pkg/routing/selector"
34-
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
3533
)
3634

3735
const (
3836
// expire participant mappings after a day
3937
participantMappingTTL = 24 * time.Hour
4038
statsUpdateInterval = 2 * time.Second
41-
statsMaxDelaySeconds = 30
39+
statsMaxDelaySeconds = float64(30)
4240

4341
// hash of node_id => Node proto
4442
NodesKey = "nodes"
@@ -59,9 +57,6 @@ type RedisRouter struct {
5957
kps rpc.KeepalivePubSub
6058
ctx context.Context
6159
isStarted atomic.Bool
62-
nodeMu sync.RWMutex
63-
// previous stats for computing averages
64-
prevStats *livekit.NodeStats
6560

6661
cancel func()
6762
}
@@ -77,21 +72,19 @@ func NewRedisRouter(lr *LocalRouter, rc redis.UniversalClient, kps rpc.Keepalive
7772
}
7873

7974
func (r *RedisRouter) RegisterNode() error {
80-
r.nodeMu.RLock()
81-
data, err := proto.Marshal((*livekit.Node)(r.currentNode))
82-
r.nodeMu.RUnlock()
75+
data, err := proto.Marshal(r.currentNode.Clone())
8376
if err != nil {
8477
return err
8578
}
86-
if err := r.rc.HSet(r.ctx, NodesKey, r.currentNode.Id, data).Err(); err != nil {
79+
if err := r.rc.HSet(r.ctx, NodesKey, string(r.currentNode.NodeID()), data).Err(); err != nil {
8780
return errors.Wrap(err, "could not register node")
8881
}
8982
return nil
9083
}
9184

9285
func (r *RedisRouter) UnregisterNode() error {
9386
// could be called after Stop(), so we'd want to use an unrelated context
94-
return r.rc.HDel(context.Background(), NodesKey, r.currentNode.Id).Err()
87+
return r.rc.HDel(context.Background(), NodesKey, string(r.currentNode.NodeID())).Err()
9588
}
9689

9790
func (r *RedisRouter) RemoveDeadNodes() error {
@@ -195,11 +188,9 @@ func (r *RedisRouter) Start() error {
195188
}
196189

197190
func (r *RedisRouter) Drain() {
198-
r.nodeMu.Lock()
199-
r.currentNode.State = livekit.NodeState_SHUTTING_DOWN
200-
r.nodeMu.Unlock()
191+
r.currentNode.SetState(livekit.NodeState_SHUTTING_DOWN)
201192
if err := r.RegisterNode(); err != nil {
202-
logger.Errorw("failed to mark as draining", err, "nodeID", r.currentNode.Id)
193+
logger.Errorw("failed to mark as draining", err, "nodeID", r.currentNode.NodeID())
203194
}
204195
}
205196

@@ -219,13 +210,9 @@ func (r *RedisRouter) statsWorker() {
219210
// update periodically
220211
select {
221212
case <-time.After(statsUpdateInterval):
222-
r.kps.PublishPing(r.ctx, livekit.NodeID(r.currentNode.Id), &rpc.KeepalivePing{Timestamp: time.Now().Unix()})
213+
r.kps.PublishPing(r.ctx, r.currentNode.NodeID(), &rpc.KeepalivePing{Timestamp: time.Now().Unix()})
223214

224-
r.nodeMu.RLock()
225-
stats := r.currentNode.Stats
226-
r.nodeMu.RUnlock()
227-
228-
delaySeconds := time.Now().Unix() - stats.UpdatedAt
215+
delaySeconds := r.currentNode.SecondsSinceNodeStatsUpdate()
229216
if delaySeconds > statsMaxDelaySeconds {
230217
if !goroutineDumped {
231218
goroutineDumped = true
@@ -245,7 +232,7 @@ func (r *RedisRouter) statsWorker() {
245232
}
246233

247234
func (r *RedisRouter) keepaliveWorker(startedChan chan error) {
248-
pings, err := r.kps.SubscribePing(r.ctx, livekit.NodeID(r.currentNode.Id))
235+
pings, err := r.kps.SubscribePing(r.ctx, r.currentNode.NodeID())
249236
if err != nil {
250237
startedChan <- err
251238
return
@@ -258,21 +245,9 @@ func (r *RedisRouter) keepaliveWorker(startedChan chan error) {
258245
continue
259246
}
260247

261-
r.nodeMu.Lock()
262-
if r.prevStats == nil {
263-
r.prevStats = r.currentNode.Stats
264-
}
265-
updated, computedAvg, err := prometheus.GetUpdatedNodeStats(r.currentNode.Stats, r.prevStats)
266-
if err != nil {
267-
logger.Errorw("could not update node stats", err)
268-
r.nodeMu.Unlock()
248+
if !r.currentNode.UpdateNodeStats() {
269249
continue
270250
}
271-
r.currentNode.Stats = updated
272-
if computedAvg {
273-
r.prevStats = updated
274-
}
275-
r.nodeMu.Unlock()
276251

277252
// TODO: check stats against config.Limit values
278253
if err := r.RegisterNode(); err != nil {

0 commit comments

Comments
 (0)