Skip to content

Commit fcbf543

Browse files
committed
Fix index in AppendEntries Response and logTruncate logic.
Fixed index to use the current index in the logEntry list for the AppendEntries Response. In logTruncate, truncate upto the index requested (excluding the index).
1 parent c9452ab commit fcbf543

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

src/cluster.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ void clusterProcessCommand(prezClient *c) {
396396

397397
dictAdd(server.cluster->synced_nodes,
398398
sdsnewlen(myself->name,PREZ_CLUSTER_NAMELEN),&node_synced);
399+
myself->prev_log_index = entry.index;
399400

400401
if(dictSize(server.cluster->nodes) == 1) {
401402
commit_index = logCurrentIndex();
@@ -789,6 +790,8 @@ void clusterProcessResponseAppendEntries(clusterLink *link,
789790
dictSize(server.cluster->nodes));
790791
while((de = dictNext(di)) != NULL) {
791792
clusterNode *cnode = dictGetVal(de);
793+
prezLog(PREZ_DEBUG,"node, port:%d, prev_index:%lld",
794+
cnode->port,cnode->prev_log_index);
792795
log_indices[i] = cnode->prev_log_index;
793796
}
794797
dictReleaseIterator(di);
@@ -799,7 +802,7 @@ void clusterProcessResponseAppendEntries(clusterLink *link,
799802
if (commit_index > server.cluster->commit_index) {
800803
logSync();
801804
logCommitIndex(commit_index);
802-
prezLog(PREZ_DEBUG, "commit index: %lld", commit_index);
805+
prezLog(PREZ_DEBUG, "AE Response commit index: %lld", commit_index);
803806
}
804807
zfree(log_indices);
805808
}
@@ -880,7 +883,7 @@ void clusterSendResponseAppendEntries(clusterLink *link, int ok) {
880883
prezLog(PREZ_DEBUG, "Sending AppendEntries Response buf:%s, sizeof(clustermsg): %lu, totlen: %d\n",
881884
buf, sizeof(clusterMsg), ntohl(hdr->totlen));
882885
hdr->data.responseappendentries.entries.term = server.cluster->current_term;
883-
hdr->data.responseappendentries.entries.index = server.cluster->current_index;
886+
hdr->data.responseappendentries.entries.index = logCurrentIndex();
884887
hdr->data.responseappendentries.entries.commit_index = server.cluster->commit_index;
885888
hdr->data.responseappendentries.entries.ok = ok;
886889
clusterSendMessage(link,buf,ntohl(hdr->totlen));

src/log.c

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -163,17 +163,20 @@ int logTruncate(long long index, long long term) {
163163
return PREZ_ERR;
164164
}
165165

166-
entry = listNodeValue(listIndex(server.cluster->log_entries,
167-
index - server.cluster->start_index-1));
168-
ftruncate(server.cluster->log_fd, entry->position);
169-
server.cluster->log_current_size = entry->position;
170-
listRewind(server.cluster->log_entries, &li);
171-
li.next = listIndex(server.cluster->log_entries,
172-
index - server.cluster->start_index-1);
173-
while ((ln = listNext(&li)) != NULL) {
174-
logEntryNode *le = listNodeValue(ln);
175-
listDelNode(server.cluster->log_entries,ln);
176-
zfree(le);
166+
if (index < server.cluster->start_index +
167+
listLength(server.cluster->log_entries)) {
168+
entry = listNodeValue(listIndex(server.cluster->log_entries,
169+
index - server.cluster->start_index-1));
170+
ftruncate(server.cluster->log_fd, entry->position);
171+
server.cluster->log_current_size = entry->position;
172+
listRewind(server.cluster->log_entries, &li);
173+
li.next = listIndex(server.cluster->log_entries,
174+
index - server.cluster->start_index-1);
175+
while ((ln = listNext(&li)) != NULL) {
176+
logEntryNode *le = listNodeValue(ln);
177+
listDelNode(server.cluster->log_entries,ln);
178+
zfree(le);
179+
}
177180
}
178181
}
179182
return PREZ_OK;
@@ -247,11 +250,13 @@ int logWriteEntry(logEntry e) {
247250
//memcpy(&en->log_entry,&e,sizeof(logEntry));
248251
en->position = server.cluster->log_current_size;
249252
server.cluster->log_current_size += sdslen(buf);
250-
prezLog(PREZ_DEBUG,"en term:%lld/%lld index:%lld, %lu",
253+
prezLog(PREZ_DEBUG,"Add to List: en term:%lld/%lld index:%lld, %lu",
251254
en->log_entry.term,
252255
e.term,
253256
en->log_entry.index, sizeof(en));
254257
listAddNodeTail(server.cluster->log_entries,en);
258+
prezLog(PREZ_DEBUG,"listLength:%lu",
259+
listLength(server.cluster->log_entries));
255260

256261
return PREZ_OK;
257262
}
@@ -310,7 +315,10 @@ long long logCurrentIndex(void) {
310315
ln = listIndex(server.cluster->log_entries, -1);
311316
if (ln) {
312317
entry = ln->value;
318+
prezLog(PREZ_DEBUG,"currentIndex:%lld",
319+
entry->log_entry.index);
313320
return(entry->log_entry.index);
314321
}
322+
prezLog(PREZ_DEBUG,"currentIndex:0");
315323
return 0;
316324
}

0 commit comments

Comments
 (0)