Skip to content

Commit de078d2

Browse files
committed
Support for subscribe batching
1 parent d959522 commit de078d2

File tree

2 files changed

+109
-26
lines changed

2 files changed

+109
-26
lines changed

scsocket.js

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -100,33 +100,13 @@ var SCSocket = function (id, server, socket) {
100100
var emptyMessageError = new InvalidMessageError('Received an empty message');
101101
Emitter.prototype.emit.call(self, 'error', emptyMessageError);
102102

103-
} else if (obj.event) {
104-
var eventName = obj.event;
105-
106-
if (self._localEvents[eventName] == null) {
107-
var response = new Response(self, obj.cid);
108-
self.server.verifyInboundEvent(self, eventName, obj.data, function (err, newEventData, ackData) {
109-
if (err) {
110-
response.error(err, ackData);
111-
} else {
112-
if (eventName == '#disconnect') {
113-
var disconnectData = newEventData || {};
114-
self._onSCClose(disconnectData.code, disconnectData.data);
115-
} else {
116-
if (self._autoAckEvents[eventName]) {
117-
if (ackData !== undefined) {
118-
response.end(ackData);
119-
} else {
120-
response.end();
121-
}
122-
Emitter.prototype.emit.call(self, eventName, newEventData);
123-
} else {
124-
Emitter.prototype.emit.call(self, eventName, newEventData, response.callback.bind(response));
125-
}
126-
}
127-
}
128-
});
103+
} else if (Array.isArray(obj)) {
104+
var len = obj.length;
105+
for (var i = 0; i < len; i++) {
106+
self._handleEventObject(obj[i]);
129107
}
108+
} else if (obj.event) {
109+
self._handleEventObject(obj);
130110
} else if (obj.rid != null) {
131111
// If incoming message is a response to a previously sent message
132112
var ret = self._callbackMap[obj.rid];
@@ -162,6 +142,37 @@ SCSocket.prototype._sendPing = function () {
162142
}
163143
};
164144

145+
SCSocket.prototype._handleEventObject = function (obj) {
146+
var self = this;
147+
148+
var eventName = obj.event;
149+
150+
if (self._localEvents[eventName] == null) {
151+
var response = new Response(self, obj.cid);
152+
self.server.verifyInboundEvent(self, eventName, obj.data, function (err, newEventData, ackData) {
153+
if (err) {
154+
response.error(err, ackData);
155+
} else {
156+
if (eventName == '#disconnect') {
157+
var disconnectData = newEventData || {};
158+
self._onSCClose(disconnectData.code, disconnectData.data);
159+
} else {
160+
if (self._autoAckEvents[eventName]) {
161+
if (ackData !== undefined) {
162+
response.end(ackData);
163+
} else {
164+
response.end();
165+
}
166+
Emitter.prototype.emit.call(self, eventName, newEventData);
167+
} else {
168+
Emitter.prototype.emit.call(self, eventName, newEventData, response.callback.bind(response));
169+
}
170+
}
171+
}
172+
});
173+
}
174+
};
175+
165176
SCSocket.prototype._resetPongTimeout = function () {
166177
var self = this;
167178

test/integration.js

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,5 +411,77 @@ describe('integration tests', function () {
411411
});
412412
});
413413
});
414+
415+
it('Should support subscription batching', function (done) {
416+
var port = 8016;
417+
server = socketClusterServer.listen(port, {
418+
authKey: serverOptions.authKey
419+
});
420+
server.on('connection', function (socket) {
421+
connectionHandler(socket);
422+
var isFirstMessage = true;
423+
socket.on('message', function (rawMessage) {
424+
if (isFirstMessage) {
425+
var data = JSON.parse(rawMessage);
426+
// All 20 subscriptions should arrive as a single message.
427+
assert.equal(data.length, 20);
428+
isFirstMessage = false;
429+
}
430+
});
431+
});
432+
433+
var subscribeMiddlewareCounter = 0;
434+
// Each subscription should pass through the middleware individually, even
435+
// though they were sent as a batch/array.
436+
server.addMiddleware(server.MIDDLEWARE_SUBSCRIBE, function (req, next) {
437+
subscribeMiddlewareCounter++;
438+
assert.equal(req.channel.indexOf('my-channel-'), 0);
439+
if (req.channel == 'my-channel-10') {
440+
assert.equal(JSON.stringify(req.data), JSON.stringify({foo: 123}));
441+
} else if (req.channel == 'my-channel-12') {
442+
// Block my-channel-12
443+
var err = new Error('You cannot subscribe to channel 12');
444+
err.name = 'UnauthorizedSubscribeError';
445+
next(err);
446+
return;
447+
}
448+
next();
449+
});
450+
451+
server.on('ready', function () {
452+
client = socketCluster.connect({
453+
hostname: clientOptions.hostname,
454+
port: port,
455+
multiplex: false
456+
});
457+
var channelList = [];
458+
for (var i = 0; i < 20; i++) {
459+
var subscribeOptions = {
460+
batch: true
461+
};
462+
if (i == 10) {
463+
subscribeOptions.data = {foo: 123};
464+
}
465+
channelList.push(
466+
client.subscribe('my-channel-' + i, subscribeOptions)
467+
);
468+
}
469+
channelList[12].on('subscribe', function (err) {
470+
throw new Error('The my-channel-12 channel should have been blocked by MIDDLEWARE_SUBSCRIBE');
471+
});
472+
channelList[12].on('subscribeFail', function (err) {
473+
assert.notEqual(err, null);
474+
assert.equal(err.name, 'UnauthorizedSubscribeError');
475+
});
476+
channelList[19].watch(function (data) {
477+
assert.equal(data, 'Hello!');
478+
assert.equal(subscribeMiddlewareCounter, 20);
479+
done();
480+
});
481+
channelList[0].on('subscribe', function () {
482+
client.publish('my-channel-19', 'Hello!');
483+
});
484+
});
485+
});
414486
});
415487
});

0 commit comments

Comments
 (0)