@@ -5,6 +5,18 @@ import (
5
5
"github.com/coreos/etcd/raft"
6
6
"github.com/coreos/etcd/raft/raftpb"
7
7
"github.com/coreos/etcd/wal"
8
+
9
+ "context"
10
+ "encoding/json"
11
+ stats "github.com/coreos/etcd/etcdserver/api/v2stats"
12
+ "github.com/coreos/etcd/pkg/types"
13
+ "github.com/dailyhunt/airdb/operation"
14
+ logger "github.com/sirupsen/logrus"
15
+ "go.uber.org/zap"
16
+ "net/http"
17
+ "net/url"
18
+ "strconv"
19
+ "time"
8
20
)
9
21
10
22
type RaftNode struct {
@@ -15,9 +27,9 @@ type RaftNode struct {
15
27
16
28
// TODO: (shailendra)channel should be of other type instead of string
17
29
// TODO: Type still needed to decide
18
- proposeCh <- chan string // proposed key value
30
+ proposeCh <- chan operation. Op // proposed key value
19
31
raftConfChangeCh <- chan raftpb.ConfChange // proposed raft/region config change
20
- commitLogCh chan <- * string // entries committed to log/wal for (key,value)
32
+ commitLogCh chan <- operation. Op // entries committed to log/wal for (key,value)
21
33
errorCh chan <- error // errors from raft session
22
34
23
35
// wal
@@ -28,40 +40,320 @@ type RaftNode struct {
28
40
snapDir string
29
41
30
42
// raft
31
- raftState raftpb.ConfState
32
- raftNode raft.Node
33
- raftStorage * raft.MemoryStorage
34
- raftTransport * rafthttp.Transport
43
+ raftState raftpb.ConfState
44
+ raftNode raft.Node
45
+ raftStorage * raft.MemoryStorage
46
+ raftTransport * rafthttp.Transport
47
+ lastAppliedIndex uint64
48
+ lastIndex uint64 // index of log at start
35
49
36
50
// Explicit cancellation of channels
37
51
stopCh chan struct {}
38
52
httpStopCh chan struct {}
39
53
httpDoneCh chan struct {}
40
54
}
41
55
42
- func newRaftNode () * RaftNode {
43
- return nil
56
+ // Todo(sohan): Error handling
57
+ func NewRaft (id int , peers []string ) * RaftNode {
58
+
59
+ rn := & RaftNode {
60
+ ID : id ,
61
+ peers : peers ,
62
+ proposeCh : make (chan operation.Op ),
63
+ raftConfChangeCh : make (chan raftpb.ConfChange ),
64
+ commitLogCh : make (chan operation.Op ),
65
+ errorCh : make (chan error ),
66
+
67
+ stopCh : make (chan struct {}),
68
+ httpStopCh : make (chan struct {}),
69
+ httpDoneCh : make (chan struct {}),
70
+ }
71
+
72
+ go rn .startRaft ()
73
+ go rn .serveChannels ()
74
+
75
+ return rn
44
76
45
77
}
46
78
47
79
func (rc * RaftNode ) startRaft () {
48
80
81
+ // Todo(sohan): Snapshot
82
+ // Todo(sohan): WAL
83
+
84
+ // Todo(sohan): Raft Config for peers
85
+ rcPeers := make ([]raft.Peer , len (rc .peers ))
86
+ for i := range rc .peers {
87
+ rcPeers [i ] = raft.Peer {ID : uint64 (i + 1 )} // Todo - doc
88
+ }
89
+
90
+ // TODO(sohan) - set read only replica using learners array
91
+ logger .Info ("Creating Raft Config for " , rc .ID )
92
+
93
+ // Todo(Sohan) - take config from Config file
94
+ rConfig := & raft.Config {
95
+ ID : uint64 (rc .ID ),
96
+ ElectionTick : 10 ,
97
+ HeartbeatTick : 1 ,
98
+ Storage : rc .raftStorage ,
99
+ MaxSizePerMsg : 1024 * 1024 ,
100
+ MaxInflightMsgs : 256 ,
101
+ }
102
+
103
+ // Todo - Start vs restart node
104
+ startPeers := rcPeers
105
+ if rc .join {
106
+ startPeers = nil
107
+ }
108
+ rc .raftNode = raft .StartNode (rConfig , startPeers )
109
+
110
+ // Todo(sohan): Raft Transport
111
+ rc .raftTransport = & rafthttp.Transport {
112
+ Logger : zap .NewExample (),
113
+ ID : types .ID (rc .ID ),
114
+ ClusterID : 0x1000 ,
115
+ Raft : rc ,
116
+ ServerStats : stats .NewServerStats ("" , "" ),
117
+ LeaderStats : stats .NewLeaderStats (strconv .Itoa (rc .ID )),
118
+ ErrorC : make (chan error ),
119
+ }
120
+
121
+ logger .WithFields (logger.Fields {
122
+ "clusterId" : rc .raftTransport .ClusterID ,
123
+ "transportId" : rc .raftTransport .ID ,
124
+ }).Info ("Starting Raft Transport" )
125
+
126
+ rc .raftTransport .Start ()
127
+
128
+ for i := range rc .peers {
129
+ if i + 1 != rc .ID {
130
+ logger .WithFields (logger.Fields {
131
+ "id" : rc .raftTransport .ID ,
132
+ "peerId" : rc .peers [i ],
133
+ "clusterId" : rc .raftTransport .ClusterID ,
134
+ }).Info ("Adding Peer to Transport" )
135
+
136
+ rc .raftTransport .AddPeer (types .ID (i + 1 ), []string {rc .peers [i ]})
137
+ }
138
+ }
139
+
140
+ go rc .serveRaft ()
141
+ go rc .serveChannels ()
142
+
143
+ }
144
+
145
+ // Todo: Doc
146
+ func (rc * RaftNode ) serveRaft () {
147
+ logger .Info ("Starting Serving Raft transport" , rc .ID )
148
+ transportAddress := rc .peers [rc .ID - 1 ]
149
+ url , err := url .Parse (transportAddress )
150
+ if err != nil {
151
+ logger .Fatalf ("Failing parsing URL for raft transport (%v)" , err )
152
+ }
153
+ logger .Info ("Raft Transport Address " , url )
154
+ listener , err := newRaftListener (url .Host , rc .stopCh )
155
+ if err != nil {
156
+ logger .Fatalf ("Failed to listen for raft http : (%v)" , err )
157
+ }
158
+
159
+ server := & http.Server {Handler : rc .raftTransport .Handler ()}
160
+ err = server .Serve (listener )
161
+
162
+ select {
163
+ case <- rc .httpStopCh :
164
+ default :
165
+ logger .Fatalf ("Failed to server raft http (%v)" , err )
166
+ }
167
+
168
+ close (rc .httpDoneCh )
169
+
170
+ }
171
+
172
+ func (rc * RaftNode ) serveChannels () {
173
+ // Todo : Wal and snapshot related
174
+
175
+ // Todo(sohan): Take from config
176
+ ticker := time .NewTicker (100 * time .Millisecond )
177
+ defer ticker .Stop ()
178
+
179
+ go rc .proposeToRaft ()
180
+
181
+ // Event loop on raft state machine updates
182
+ for {
183
+ select {
184
+ case <- ticker .C :
185
+ rc .raftNode .Tick ()
186
+
187
+ // store raft entries to wal, then publish over commit channel
188
+ case rd := <- rc .raftNode .Ready ():
189
+ // Todo:
190
+ //rc.wal.Save(rd.HardState, rd.Entries)
191
+ // Todo: Save to WAL
192
+ // Todo: Snapshot stuff
193
+ rc .raftStorage .Append (rd .Entries )
194
+ rc .raftTransport .Send (rd .Messages )
195
+
196
+ outEntries := rc .entriesToApply (rd .CommittedEntries )
197
+ if ok := rc .publishEntries (outEntries ); ! ok {
198
+ rc .stopRaft ()
199
+ return
200
+ }
201
+
202
+ // todo : Trigger snapshot
203
+ rc .raftNode .Advance ()
204
+ case err := <- rc .raftTransport .ErrorC :
205
+ rc .writeError (err )
206
+ return
207
+ case <- rc .stopCh :
208
+ rc .stopRaft ()
209
+ return
210
+ }
211
+
212
+ } //select end
213
+
214
+ }
215
+
216
+ func (rc * RaftNode ) proposeToRaft () {
217
+ var configChangeCount uint64 = 0 // for config change id
218
+
219
+ for rc .proposeCh != nil && rc .raftConfChangeCh != nil {
220
+ select {
221
+ case proposed , ok := <- rc .proposeCh :
222
+ if ! ok {
223
+ rc .proposeCh = nil
224
+ } else {
225
+ // Todo(sohan)- switch based on type
226
+ op := proposed .Name ()
227
+ switch op {
228
+ case "PUT" :
229
+ logger .Debug ("Processing Put in proposeRaft " , proposed .String ())
230
+ rc .raftNode .Propose (context .TODO (), []byte (proposed .String ()))
231
+ default :
232
+ // TODO
233
+ logger .Warn ("Invalid operation" )
234
+ }
235
+
236
+ }
237
+
238
+ case configChange , ok := <- rc .raftConfChangeCh :
239
+ if ! ok {
240
+ rc .raftConfChangeCh = nil
241
+ } else {
242
+ configChangeCount += 1
243
+ configChange .ID = configChangeCount
244
+ // Todo : check for why use context.TODO()
245
+ rc .raftNode .ProposeConfChange (context .TODO (), configChange )
246
+ }
247
+ } // select end
248
+ } // for end
249
+
250
+ // client closed channel; shutdown raft if not already
251
+ close (rc .stopCh )
252
+
49
253
}
50
254
51
255
func (rc * RaftNode ) stopRaft () {
256
+ rc .stopHTTP ()
257
+ close (rc .commitLogCh )
258
+ close (rc .errorCh )
259
+ rc .raftNode .Stop ()
260
+ }
261
+
262
+ func (rc * RaftNode ) writeError (err error ) {
263
+ rc .stopHTTP ()
264
+ close (rc .commitLogCh )
265
+ rc .errorCh <- err
266
+ close (rc .errorCh )
267
+ rc .raftNode .Stop ()
268
+ }
52
269
270
+ func (rc * RaftNode ) stopHTTP () {
271
+ rc .raftTransport .Stop ()
272
+ close (rc .httpStopCh )
273
+ <- rc .httpDoneCh
53
274
}
54
275
55
276
// Find out entries to apply/publish
56
277
// Ignore old entries from last index
57
- func (rc * RaftNode ) entriesToApply () {
278
+ func (rc * RaftNode ) entriesToApply (inEntries []raftpb.Entry ) (outEntries []raftpb.Entry ) {
279
+ if len (inEntries ) == 0 {
280
+ return
281
+ }
282
+
283
+ firstIndex := inEntries [0 ].Index
284
+ if firstIndex > rc .lastAppliedIndex + 1 {
285
+ logger .Fatalf ("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1" , firstIndex , rc .lastAppliedIndex )
286
+ }
287
+
288
+ if rc .lastAppliedIndex - firstIndex + 1 < uint64 (len (inEntries )) {
289
+ // Slice after verification
290
+ outEntries = inEntries [rc .lastAppliedIndex - firstIndex + 1 :]
291
+ }
292
+
293
+ return outEntries
58
294
59
295
}
60
296
61
297
// Write committed logs to commit Channel
62
298
// Return if all entries could be published
63
- func (rc * RaftNode ) publishEntries () {
299
+ func (rc * RaftNode ) publishEntries (entries []raftpb.Entry ) bool {
300
+ for i := range entries {
301
+ entry := entries [i ]
302
+ entryType := entry .Type
64
303
304
+ switch entryType {
305
+ case raftpb .EntryNormal :
306
+ if len (entry .Data ) == 0 {
307
+ break // Ignore empty entries
308
+ }
309
+
310
+ // Todo: Handle different operations based on type
311
+ // Todo: Handle Type of message as of now it is string
312
+ var put operation.Put
313
+ json .Unmarshal (entry .Data , & put )
314
+
315
+ select {
316
+ case rc .commitLogCh <- & put : // todo (pointer allocation)
317
+ case <- rc .stopCh :
318
+ return false
319
+ }
320
+
321
+ case raftpb .EntryConfChange :
322
+ var cc raftpb.ConfChange
323
+ cc .Unmarshal (entry .Data )
324
+ rc .raftState = * rc .raftNode .ApplyConfChange (cc )
325
+ switch cc .Type {
326
+ case raftpb .ConfChangeAddNode :
327
+ if len (cc .Context ) > 0 {
328
+ nodeId := types .ID (cc .NodeID )
329
+ us := []string {string (cc .Context )} // Todo : Log this
330
+ rc .raftTransport .AddPeer (nodeId , us )
331
+ }
332
+ case raftpb .ConfChangeRemoveNode :
333
+ if cc .NodeID == uint64 (rc .ID ) {
334
+ logger .Info ("I've been removed from the cluster! Shutting down." )
335
+ return false
336
+ }
337
+ rc .raftTransport .RemovePeer (types .ID (cc .NodeID ))
338
+ }
339
+
340
+ } // switch
341
+
342
+ // after commit, update appliedIndex
343
+ rc .lastAppliedIndex = entry .Index
344
+
345
+ // special nil commit to signal replay has finished
346
+ // Todo : why it is needed ??
347
+ if entry .Index == rc .lastIndex {
348
+ select {
349
+ case rc .commitLogCh <- nil :
350
+ case <- rc .stopCh :
351
+ return false
352
+ }
353
+ }
354
+ } // for
355
+
356
+ return true
65
357
}
66
358
67
359
func (rc * RaftNode ) openWal () {
@@ -71,3 +363,19 @@ func (rc *RaftNode) openWal() {
71
363
func (rc * RaftNode ) replayWAL () {
72
364
73
365
}
366
+
367
+ func (rc * RaftNode ) Process (ctx context.Context , m raftpb.Message ) error {
368
+ return rc .raftNode .Step (ctx , m )
369
+ }
370
+
371
+ func (rc * RaftNode ) IsIDRemoved (id uint64 ) bool {
372
+ panic ("implement me" )
373
+ }
374
+
375
+ func (rc * RaftNode ) ReportUnreachable (id uint64 ) {
376
+ panic ("implement me" )
377
+ }
378
+
379
+ func (rc * RaftNode ) ReportSnapshot (id uint64 , status raft.SnapshotStatus ) {
380
+ panic ("implement me" )
381
+ }
0 commit comments