Skip to content

Commit c9452ab

Browse files
committed
Added infrastructure code for processing commands from clients.
Added code to handle commands from clients in leader. A logEntry is created for the command and is appended to the leader log. The log gets propagated to other nodes in the cluster during its next heartbeat. Each node upon receipt of the AppendEntry request will update its local log.
1 parent 60a8ce0 commit c9452ab

File tree

6 files changed

+214
-30
lines changed

6 files changed

+214
-30
lines changed

src/cluster.c

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ void clusterInit(void) {
162162
server.cluster->election_timeout = PREZ_CLUSTER_ELECTION_TIMEOUT;
163163
server.cluster->heartbeat_interval = PREZ_CLUSTER_HEARTBEAT_INTERVAL;
164164
server.cluster->synced_nodes = dictCreate(&clusterNodesDictType,NULL);
165+
server.cluster->start_index = 0;
165166
server.cluster->current_term = 0;
166167
server.cluster->commit_index = 0;
167168
server.cluster->votes_granted = 0;
@@ -286,7 +287,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
286287
anetEnableTcpNoDelay(NULL,cfd);
287288

288289
/* Use non-blocking I/O for cluster messages. */
289-
prezLog(PREZ_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
290+
prezLog(PREZ_VERBOSE,"Accepted cluster node %s:%d cfd:%d", cip, cport, cfd);
290291
/* Create a link object we use to handle the connection.
291292
* It gets passed to the readable handler when data is available.
292293
* Initiallly the link->node pointer is set to NULL as we don't know
@@ -381,6 +382,28 @@ void clusterRenameNode(clusterNode *node, char *newname) {
381382
clusterAddNode(node);
382383
}
383384

385+
void clusterProcessCommand(prezClient *c) {
386+
logEntry entry;
387+
long long commit_index;
388+
389+
prezLog(PREZ_DEBUG,"clusterProcessCommand");
390+
entry.index = logCurrentIndex()+1;
391+
entry.term = server.cluster->current_term;
392+
memcpy(entry.commandName,c->cmd->name,strlen(c->cmd->name)+1);
393+
memcpy(entry.command,c->cmd->name,strlen(c->cmd->name)+1);
394+
395+
logWriteEntry(entry);
396+
397+
dictAdd(server.cluster->synced_nodes,
398+
sdsnewlen(myself->name,PREZ_CLUSTER_NAMELEN),&node_synced);
399+
400+
if(dictSize(server.cluster->nodes) == 1) {
401+
commit_index = logCurrentIndex();
402+
logCommitIndex(commit_index);
403+
prezLog(PREZ_DEBUG,"commit index: %lld", commit_index);
404+
}
405+
}
406+
384407
int clusterProcessPacket(clusterLink *link) {
385408
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
386409
uint32_t totlen = ntohl(hdr->totlen);
@@ -414,7 +437,7 @@ int clusterProcessPacket(clusterLink *link) {
414437
uint32_t explen;
415438
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
416439
explen += (sizeof(clusterMsgDataAppendEntries) +
417-
(hdr->data.appendentries.entries.log_entries_count-1) *
440+
(ntohs(hdr->data.appendentries.entries.log_entries_count)-1) *
418441
sizeof(logEntry));
419442
if (totlen != explen) return 1;
420443

@@ -442,7 +465,8 @@ int clusterProcessPacket(clusterLink *link) {
442465
explen += sizeof(clusterMsgDataResponseAppendEntries);
443466
if (totlen != explen) return 1;
444467

445-
prezLog(PREZ_DEBUG,"--- Received AppendEntriesResponse term %lld, index: %lld, commit_index: %lld, ok: %d",
468+
prezLog(PREZ_DEBUG,"--- Received AppendEntriesResponse port:%d term %lld, index: %lld, commit_index: %lld, ok: %d",
469+
link->node->port,
446470
hdr->data.responseappendentries.entries.term,
447471
hdr->data.responseappendentries.entries.index,
448472
hdr->data.responseappendentries.entries.commit_index,
@@ -767,6 +791,7 @@ void clusterProcessResponseAppendEntries(clusterLink *link,
767791
clusterNode *cnode = dictGetVal(de);
768792
log_indices[i] = cnode->prev_log_index;
769793
}
794+
dictReleaseIterator(di);
770795
qsort(log_indices,dictSize(server.cluster->nodes),sizeof(long long),
771796
compareIndices);
772797
reverseIndices(log_indices,dictSize(server.cluster->nodes));
@@ -874,7 +899,7 @@ void clusterSendAppendEntries(clusterLink *link) {
874899
memcpy(hdr->data.appendentries.entries.leaderid, myself->name,
875900
PREZ_CLUSTER_NAMELEN);
876901
hdr->data.appendentries.entries.prev_log_index = node->prev_log_index;
877-
hdr->data.appendentries.entries.prev_log_term = 0;
902+
hdr->data.appendentries.entries.prev_log_term = node->last_sent_term;
878903
hdr->data.appendentries.entries.leader_commit_index =
879904
server.cluster->commit_index;
880905
node->last_sent_entry = NULL;
@@ -889,8 +914,23 @@ void clusterSendAppendEntries(clusterLink *link) {
889914
node->prev_log_index-server.cluster->start_index);
890915
while(ln && logcount <= server.cluster->log_max_entries_per_request) {
891916
le_node = listNodeValue(ln);
892-
memcpy(&(hdr->data.appendentries.entries.log_entries[logcount]),
917+
hdr->data.appendentries.entries.log_entries[logcount].term =
918+
le_node->log_entry.term;
919+
hdr->data.appendentries.entries.log_entries[logcount].index =
920+
le_node->log_entry.index;
921+
memcpy(hdr->data.appendentries.entries.log_entries[logcount].commandName,
922+
le_node->log_entry.commandName, PREZ_COMMAND_NAMELEN);
923+
memcpy(hdr->data.appendentries.entries.log_entries[logcount].command,
924+
le_node->log_entry.command, PREZ_COMMAND_NAMELEN);
925+
prezLog(PREZ_DEBUG,"AE term:%lld, index:%lld, cmd:%s, cmd:%s",
926+
hdr->data.appendentries.entries.log_entries[logcount].term,
927+
hdr->data.appendentries.entries.log_entries[logcount].index,
928+
hdr->data.appendentries.entries.log_entries[logcount].commandName,
929+
hdr->data.appendentries.entries.log_entries[logcount].command);
930+
931+
/*memcpy(&(hdr->data.appendentries.entries.log_entries[logcount]),
893932
&(le_node->log_entry), sizeof(logEntry));
933+
*/
894934
ln_next = listNextNode(ln);
895935
logcount++;
896936
if (!ln_next) {
@@ -901,17 +941,19 @@ void clusterSendAppendEntries(clusterLink *link) {
901941
ln = ln_next;
902942
}
903943
}
904-
hdr->data.appendentries.entries.log_entries_count = logcount;
944+
hdr->data.appendentries.entries.log_entries_count = htons(logcount);
905945
node->last_sent_term = server.cluster->current_term;
906946

907947
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
908948
totlen += (sizeof(clusterMsgDataAppendEntries)-sizeof(logEntry));
909949
totlen += (sizeof(logEntry)*logcount);
910950
hdr->totlen = htonl(totlen);
911951

912-
prezLog(PREZ_DEBUG, "Sending heartbeat buf:%s, sizeof(clustermsg): "
913-
"%lu, totlen: %d\n",
914-
buf, sizeof(clusterMsg), ntohl(hdr->totlen));
952+
prezLog(PREZ_DEBUG, "Sending heartbeat to port:%d, buf:%s, sizeof(clustermsg): "
953+
"%lu, totlen: %d logcount: %d\n",
954+
node->port, buf, sizeof(clusterMsg),
955+
ntohl(hdr->totlen),
956+
ntohs(hdr->data.appendentries.entries.log_entries_count));
915957

916958
clusterSendMessage(link,buf,ntohl(hdr->totlen));
917959
}
@@ -1018,6 +1060,7 @@ void clusterCron(void) {
10181060
clusterSendHeartbeat(node->link);
10191061
}
10201062
}
1063+
dictReleaseIterator(di);
10211064
}
10221065

10231066
}

src/cluster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,11 @@ void clusterSendResponseAppendEntries(clusterLink *link, int ok);
234234
int loadLogFile(void);
235235
int logTruncate(long long index, long long term);
236236
sds catLogEntry(sds dst, int argc, robj **argv);
237-
int logWriteEntry(logEntry *e);
237+
int logWriteEntry(logEntry e);
238238
int logAppendEntries(clusterMsgDataAppendEntries entries);
239239
int logCommitIndex(long long index);
240240
int logSync(void);
241+
long long logCurrentIndex(void);
241242

242243
/* Functions as macros */
243244
#define quorumSize ((dictSize(server.cluster->nodes) / 2) + 1)

src/log.c

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "prez.h"
3131
#include "cluster.h"
32+
#include "endianconv.h"
3233

3334
#include <signal.h>
3435
#include <fcntl.h>
@@ -81,7 +82,7 @@ int loadLogFile(void) {
8182
argc = atoi(buf+1);
8283
if (argc < 1) goto fmterr;
8384

84-
entry = zmalloc(sizeof(entry));
85+
entry = zmalloc(sizeof(*entry));
8586
for (j = 0; j < argc; j++) {
8687
if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
8788
if (buf[0] != '$') goto fmterr;
@@ -163,12 +164,12 @@ int logTruncate(long long index, long long term) {
163164
}
164165

165166
entry = listNodeValue(listIndex(server.cluster->log_entries,
166-
index - server.cluster->start_index));
167+
index - server.cluster->start_index-1));
167168
ftruncate(server.cluster->log_fd, entry->position);
168169
server.cluster->log_current_size = entry->position;
169170
listRewind(server.cluster->log_entries, &li);
170171
li.next = listIndex(server.cluster->log_entries,
171-
index - server.cluster->start_index - 1);
172+
index - server.cluster->start_index-1);
172173
while ((ln = listNext(&li)) != NULL) {
173174
logEntryNode *le = listNodeValue(ln);
174175
listDelNode(server.cluster->log_entries,ln);
@@ -203,31 +204,31 @@ sds catLogEntry(sds dst, int argc, robj **argv) {
203204
return dst;
204205
}
205206

206-
int logWriteEntry(logEntry *e) {
207+
int logWriteEntry(logEntry e) {
207208
ssize_t nwritten;
208209
robj *argv[4];
209210
sds buf = sdsempty();
210211
logEntryNode *en;
211212

212213
if (listLength(server.cluster->log_entries) > 0) {
213214
en = listNodeValue(listIndex(server.cluster->log_entries,-1));
214-
if (e->term < en->log_entry.term) {
215+
if (e.term < en->log_entry.term) {
215216
prezLog(PREZ_NOTICE, "Cannot append entry with earlier term."
216217
"term:%lld index:%lld, last term:%lld index:%lld",
217-
e->term, e->index, en->log_entry.term, en->log_entry.index);
218+
e.term, e.index, en->log_entry.term, en->log_entry.index);
218219
return PREZ_ERR;
219-
} else if (e->term == en->log_entry.term && e->index <= en->log_entry.index) {
220+
} else if (e.term == en->log_entry.term && e.index <= en->log_entry.index) {
220221
prezLog(PREZ_NOTICE, "Cannot append entry with earlier index."
221222
"term:%lld index:%lld, last term:%lld index:%lld",
222-
e->term, e->index, en->log_entry.term, en->log_entry.index);
223+
e.term, e.index, en->log_entry.term, en->log_entry.index);
223224
return PREZ_ERR;
224225
}
225226
}
226227

227-
argv[0] = createStringObjectFromLongLong(ntohl(e->index));
228-
argv[1] = createStringObjectFromLongLong(ntohl(e->term));
229-
argv[2] = createStringObject(e->commandName, PREZ_COMMAND_NAMELEN);
230-
argv[3] = createStringObject(e->command, PREZ_COMMAND_NAMELEN);
228+
argv[0] = createStringObjectFromLongLong(e.index);
229+
argv[1] = createStringObjectFromLongLong(e.term);
230+
argv[2] = createStringObject(e.commandName, PREZ_COMMAND_NAMELEN);
231+
argv[3] = createStringObject(e.command, PREZ_COMMAND_NAMELEN);
231232
buf = catLogEntry(buf, 4, argv);
232233
decrRefCount(argv[0]);
233234
decrRefCount(argv[1]);
@@ -238,10 +239,18 @@ int logWriteEntry(logEntry *e) {
238239
prezLog(PREZ_NOTICE,"log write incomplete");
239240
}
240241

241-
en = zmalloc(sizeof(en));
242-
memcpy(&(en->log_entry),e,sizeof(logEntry));
242+
en = zmalloc(sizeof(*en));
243+
en->log_entry.term = e.index;
244+
en->log_entry.index = e.term;
245+
memcpy(en->log_entry.commandName, e.commandName, PREZ_COMMAND_NAMELEN);
246+
memcpy(en->log_entry.command, e.command, PREZ_COMMAND_NAMELEN);
247+
//memcpy(&en->log_entry,&e,sizeof(logEntry));
243248
en->position = server.cluster->log_current_size;
244249
server.cluster->log_current_size += sdslen(buf);
250+
prezLog(PREZ_DEBUG,"en term:%lld/%lld index:%lld, %lu",
251+
en->log_entry.term,
252+
e.term,
253+
en->log_entry.index, sizeof(en));
245254
listAddNodeTail(server.cluster->log_entries,en);
246255

247256
return PREZ_OK;
@@ -252,8 +261,9 @@ int logAppendEntries(clusterMsgDataAppendEntries entries) {
252261
logEntry *e;
253262

254263
e = (logEntry*) entries.log_entries;
264+
prezLog(PREZ_DEBUG,"count:%d",ntohs(entries.log_entries_count));
255265
for(i=0;i<ntohs(entries.log_entries_count);i++) {
256-
if(logWriteEntry(e)) {
266+
if(logWriteEntry(entries.log_entries[i])) {
257267
prezLog(PREZ_NOTICE, "log write error");
258268
return PREZ_ERR;
259269
}
@@ -282,6 +292,7 @@ int logCommitIndex(long long index) {
282292
server.cluster->commit_index = entry->log_entry.index;
283293

284294
/* Process command which is what commit is really about */
295+
prezLog(PREZ_DEBUG,"cmd invoke");
285296
/* return if join command */
286297
}
287298
return PREZ_OK;
@@ -291,3 +302,15 @@ int logSync(void) {
291302
if(fsync(server.cluster->log_fd) == -1) return PREZ_ERR;
292303
return PREZ_OK;
293304
}
305+
306+
long long logCurrentIndex(void) {
307+
listNode *ln;
308+
logEntryNode *entry;
309+
310+
ln = listIndex(server.cluster->log_entries, -1);
311+
if (ln) {
312+
entry = ln->value;
313+
return(entry->log_entry.index);
314+
}
315+
return 0;
316+
}

src/networking.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,11 +1119,8 @@ void processInputBuffer(prezClient *c) {
11191119
resetClient(c);
11201120
} else {
11211121
/* Only reset the client when the command was executed. */
1122-
//FIXME: come back
1123-
#if 0
11241122
if (processCommand(c) == PREZ_OK)
11251123
resetClient(c);
1126-
#endif
11271124
}
11281125
}
11291126
}

0 commit comments

Comments
 (0)