Skip to content

Changes to prevent deadlocks and loops on elections #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions lib/candidate.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module.exports = Candidate;
function Candidate(node, options) {
State.call(this, node);
this.options = options;
this.node.commonState.persisted.votedFor = this.node.id;

this._startVoting();
}
Expand All @@ -27,7 +26,7 @@ C._startVoting = function _startVoting() {

async.series([
startTimeout,
incrementTerm,
incrementTermAndVoteToSelf,
requestVotes
]);

Expand All @@ -38,11 +37,14 @@ C._startVoting = function _startVoting() {
}

function onElectionTimeout() {
self.node.toState('candidate');
self.node.toState('candidate');
}

function incrementTerm(cb) {
self.node.commonState.persisted.currentTerm += 1;
function incrementTermAndVoteToSelf(cb) {
self.node.currentTerm(self.node.currentTerm()+1);
self.node.commonState.persisted.votedFor = self.node.id;
self.node.commonState.persisted.voteTerm = self.node.currentTerm();

cb();
}

Expand All @@ -58,7 +60,7 @@ C._startVoting = function _startVoting() {
}

var args = {
term: self.node.commonState.persisted.currentTerm,
term: self.node.currentTerm(),
candidateId: self.node.id,
lastLogIndex: self.node.commonState.persisted.log.length(),
lastLogTerm: lastLog && lastLog.term
Expand All @@ -69,10 +71,12 @@ C._startVoting = function _startVoting() {
}

function onBroadcastResponse(err, args) {
// TODO: what about the term update?
if (args && args.voteGranted) {
votedForMe ++;
verifyMajority();
} else if (args && args.reason == 'too soon') {
// 'too soon' means that some one must have seen the leader alive
setImmediate(function () { self.node.toState('follower'); cb(); });
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/default_node_options.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module.exports = {
standby: false,
delayElectionTimeout: 1000,
minElectionTimeout: 150,
maxElectionTimeout: 300,
heartbeatInterval: 50,
Expand Down
5 changes: 3 additions & 2 deletions lib/follower.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ function Follower(node, options) {
this.lastHeardFromLeader = undefined;
this.lastLeader = undefined;

this.node.startElectionTimeout();
// delay the initial election timeout to allow the leader to send a heart beat
setTimeout(function () { self.node.startElectionTimeout(); }, options.delayElectionTimeout);

this.node.commonState.persisted.votedFor = null;

Expand Down Expand Up @@ -72,7 +73,7 @@ F.onAppendEntries = function onAppendEntries(args, cb) {
self.lastLeader = args.leaderId;
self.lastHeardFromLeader = Date.now();

self.node.commonState.persisted.currentTerm = args.term;
self.node.currentTerm(args.term);

self.node.commonState.volatile.commitIndex = Math.min(
args.leaderCommit, self.node.commonState.persisted.log.length());
Expand Down
4 changes: 4 additions & 0 deletions lib/leader.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ function Leader(node, options) {
if (peer) {
if (err) {
self.emit('warning', err);
if (err.timeout) {
peer.meta.disconnect();
peer.meta.connect();
}
}
else if (args && args.term > self.node.currentTerm()) {
self.node.currentTerm(args.term);
Expand Down
7 changes: 6 additions & 1 deletion lib/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ N._join = function _join(peerDesc, metadata, connection) {
peer.on('connected', onPeerConnected);
peer.on('disconnected', onPeerDisconnected);
peer.on('connecting', onPeerConnecting);

peer.on('error', onError);

if (found) {
self.emit('reconnected', peer);
}
Expand All @@ -318,6 +319,10 @@ N._join = function _join(peerDesc, metadata, connection) {
}
}

function onError(err) {
self.emit('error', err);
}

function onPeerCall(type, args, cb) {
self.handlePeerCall(peer, type, args, cb);
}
Expand Down
6 changes: 5 additions & 1 deletion lib/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ function Peer(id, metadata, options, connection, node) {
}

function invoke(work, done) {

var calledback = false;
self._invoke.call(self, work.type, work.args, callback);

function callback() {

if (!calledback) {
calledback = true;
work.cb.apply(null, arguments);
Expand All @@ -53,9 +55,11 @@ function Peer(id, metadata, options, connection, node) {
}

function receive(message, cb) {

self.emit('call', message.type, message.args, callback);

function callback() {

message.cb.apply(null, arguments);
cb();
}
Expand Down Expand Up @@ -128,7 +132,7 @@ P._invoke = function _invoke(type, args, cb) {
var timeout;

if (this.node.options) {
setTimeout(onTimeout, this.node.options.commandTimeout);
timeout = setTimeout(onTimeout, this.node.options.commandTimeout);
}

this.emit('outgoing call', type, args);
Expand Down
3 changes: 2 additions & 1 deletion lib/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ S.onRequestVote = function onRequestVote(args, cb) {
if (args.term < currentTerm) {
callback(false);
}
else if (!state.votedFor || state.votedFor == args.candidateId) {
else if (!state.votedFor || (state.voteTerm < args.term) || state.votedFor == args.candidateId) {
var lastLog = state.log.last();
if (lastLog && lastLog.term < args.lastLogTerm) {
callback(true);
Expand All @@ -84,6 +84,7 @@ S.onRequestVote = function onRequestVote(args, cb) {
function callback(grant) {
if (grant) {
state.votedFor = args.candidateId;
state.voteTerm = args.term;
self.node.emit('vote granted', state.votedFor);
}
cb(null, {term: currentTerm, voteGranted: grant});
Expand Down