Skip to content

Commit aa202b6

Browse files
author
taoyuan
committed
added amqp client and server
1 parent d940731 commit aa202b6

File tree

8 files changed

+376
-4
lines changed

8 files changed

+376
-4
lines changed

index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,7 @@
33
var rayson = module.exports = require('jayson');
44

55
rayson.client.mqtt = require('./lib/mqtt/mqtt-client');
6+
rayson.client.amqp = require('./lib/amqp/amqp-client');
7+
68
rayson.server.interfaces.mqtt = require('./lib/mqtt/mqtt-server');
9+
rayson.server.interfaces.amqp = require('./lib/amqp/amqp-server');

lib/amqp/amqp-base.js

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"use strict";
2+
3+
var assert = require('assert');
4+
var amqper = require('amqper');
5+
6+
module.exports = AMQPBase;
7+
8+
function isAMQPClient(obj) {
9+
return obj instanceof amqper.Client;
10+
}
11+
12+
/**
13+
* @param {Object|String} amqpclient The real mqtt client or connection url or the options.
14+
* @param {Object|String} [options] The topic or options.
15+
* @param {String} [options.url] the connection url.
16+
* @param {String} [options.exchange] The exchange string. 'amq.topic' is default.
17+
* @param {String} options.topic The service topic. should be like `hello/:param1/:param2/service`
18+
* @param {String} [options.format] The codec for encode and decode the message by default mqtt client. Cloud be `json` or
19+
* `msgpack`, default is `json`.
20+
* @param {Number} [options.timeout] The callback cache timeout. 10 seconds is default.
21+
* @param {Boolean|Number} [options.scan] The interval time scan the callback cache for timeout. 10 seconds is default.
22+
* @return {AMQPBase}
23+
* @api public
24+
*/
25+
function AMQPBase(amqpclient, options) {
26+
27+
if (isAMQPClient(amqpclient)) {
28+
this.client = amqpclient;
29+
} else {
30+
this.client = amqper.connect(amqpclient);
31+
}
32+
33+
var client = this.client;
34+
if (client !== amqpclient && !options) {
35+
options = amqpclient;
36+
}
37+
38+
if (typeof options === 'string') {
39+
options = {topic: options};
40+
}
41+
42+
this.options = options = options || {};
43+
options.topic = options.topic || options.routingKey;
44+
assert(typeof this.options.topic === 'string', 'options.topic is required');
45+
46+
options.exchange = options.exchange || 'amq.topic';
47+
48+
// transform topic
49+
options.topic = options.topic.replace(/\//g, '.');
50+
options.queue = options.queue || options.topic.replace('/\:/g', '').replace('/\./g', '_');
51+
52+
// init format
53+
client.format(options.format);
54+
55+
this.format = function (fmt) {
56+
client.format(fmt);
57+
};
58+
59+
this.ready = function (cb) {
60+
client.ready(cb);
61+
};
62+
63+
this.close = function (done) {
64+
client.close(done);
65+
};
66+
}

lib/amqp/amqp-client.js

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
"use strict";
2+
3+
var debug = require('debug')('rayson:amqp:client');
4+
var _ = require('lodash');
5+
var NodeCache = require("node-cache");
6+
var Client = require('jayson').Client;
7+
var AMQPBase = require('../amqp/amqp-base');
8+
var errors = require('../errors');
9+
10+
module.exports = AMQPClient;
11+
12+
/**
13+
* Constructor for a Rayson MQTT Client
14+
* @class Rayson JSON-RPC MQTT Client
15+
* @constructor
16+
* @extends Client
17+
* @param {Object|String} amqpclient The real mqtt client or connection url or the options.
18+
* @param {Object|String} [options] The topic or options.
19+
* @param {String} [options.url] the connection url.
20+
* @param {String} [options.exchange] The exchange string. 'amq.topic' is default.
21+
* @param {String} [options.topic] The service topic. should be like `hello/:param1/:param2/service`
22+
* @param {String} [options.format] The codec for encode and decode the message by default mqtt client. C
23+
* loud be `json` or `msgpack`, default is `json`.
24+
* @param {Number} [options.timeout] The callback cache timeout. 10 seconds is default.
25+
* @param {Boolean|Number} [options.cacheCheckInterval] The interval time scan the callback cache for timeout.
26+
* 10 seconds is default.
27+
* @return {AMQPClient}
28+
* @api public
29+
*/
30+
function AMQPClient(amqpclient, options) {
31+
if (!(this instanceof AMQPClient)) return new AMQPClient(amqpclient, options);
32+
33+
Client.call(this);
34+
AMQPBase.call(this, amqpclient, options);
35+
36+
var that = this;
37+
options = this.options;
38+
39+
options.ttl = options.ttl || 10; // seconds
40+
options.cacheCheckInterval = options.cacheCheckInterval || 12;
41+
42+
var params = this.params = options.topic.match(/\:[a-zA-Z0-9]+/g);
43+
var values = this.values = {};
44+
45+
if (params) {
46+
// transform the params
47+
this.params = _.map(params, function (param) {
48+
param = param.substring(1);
49+
that[param] = function (value) {
50+
values[param] = value;
51+
return that;
52+
};
53+
return param;
54+
});
55+
}
56+
57+
// init callback cache
58+
this.cache = new NodeCache({stdTTL: options.ttl, checkperiod: options.cacheCheckInterval});
59+
60+
// subscribe reply topic
61+
var client = this.client;
62+
var replyTo = options.topic + '.reply';
63+
options.queue = options.queue + '_reply';
64+
client.$promise.then(function () {
65+
debug('subscribe reply to topic:', replyTo);
66+
client.subscribe(replyTo, options, getResponseHandler(that.cache));
67+
});
68+
}
69+
70+
require('util').inherits(AMQPClient, Client);
71+
72+
AMQPClient.prototype._request = function (request, callback) {
73+
var topic = this.options.topic;
74+
if (this.params) {
75+
// validate topic params
76+
for (var i = 0; i < this.params.length; i++) {
77+
var param = this.params[i];
78+
var value = this.values[param];
79+
if (value === undefined || value === null) {
80+
throw new Error('Missing topic param `' + param + '`');
81+
}
82+
topic = topic.replace(':' + param, value);
83+
}
84+
}
85+
86+
debug('publish request', topic, request);
87+
var that = this;
88+
var cache = this.cache;
89+
var ok = this.client.publish(this.options.exchange, topic, request).then(function () {
90+
if (callback) {
91+
cache.set(request.id, {cb: callback});
92+
}
93+
}, function (err) {
94+
if (callback) callback(err);
95+
});
96+
97+
request.timeout = function (ms) {
98+
ok.then(function () {
99+
var item = cache.get(request.id);
100+
if (!item) return request;
101+
if (item.timer) clearTimeout(item.timer);
102+
item.timer = setTimeout(getTimeoutHandler(that.cache, request.id, ms), ms);
103+
});
104+
return request;
105+
}
106+
};
107+
108+
function getResponseHandler(cache) {
109+
return function (message) {
110+
var payload = message.payload;
111+
if (!payload || !payload.id) return debug('invalid response', payload);
112+
113+
debug('response #' + payload.id, payload.error ? payload.error : '-', payload.result ? payload.result : '-');
114+
115+
var item = cache.get(payload.id);
116+
cache.del(payload.id);
117+
if (!item) return debug('no cache callback for response #' + payload.id, '. maybe it\'s timeout.');
118+
119+
if (item.timer) {
120+
clearTimeout(item.timer);
121+
item.timer = null;
122+
}
123+
item.cb(null, payload);
124+
}
125+
}
126+
127+
function getTimeoutHandler(cache, id, ms) {
128+
return function () {
129+
var item = cache.get(id);
130+
if (!item) return;
131+
cache.del(id);
132+
debug('response #' + id, 'with timeout error after ' + ms + 'ms');
133+
item.cb(new errors.TimeoutError('Request timed out after ' + ms + 'ms'));
134+
}
135+
}

lib/amqp/amqp-server.js

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"use strict";
2+
3+
var debug = require('debug')('rayson:amqp:server');
4+
var path = require('path');
5+
var _ = require('lodash');
6+
var AMQPBase = require('./amqp-base');
7+
8+
module.exports = AMQPServer;
9+
10+
/**
11+
* Constructor for a Jayson MQTT server
12+
* @param {jayson.Server} server Server instance
13+
* @param {Object|String} amqpclient The mqtt.js client or connection url or the options.
14+
* @param {Object|String} [options] The topic or options.
15+
* @param {String} options.topic The service topic. should be like `hello/:param1/:param2/service`
16+
* @param {String} [options.format] The codec for encode and decode the message by default mqtt client. Cloud be `json` or
17+
* `msgpack`, default is `json`.
18+
* @return {AMQPServer}
19+
* @api public
20+
*/
21+
function AMQPServer(server, amqpclient, options) {
22+
if (!(this instanceof AMQPServer)) return new AMQPServer(server, amqpclient, options);
23+
24+
AMQPBase.call(this, amqpclient, options);
25+
26+
this.options = options = _.defaults(this.options, server.options);
27+
28+
var that = this;
29+
var client = this.client;
30+
this.$promise = client.$promise.then(function () {
31+
debug('subscribe', options.topic);
32+
return client.subscribe(options.topic, options, getRequestHandler(server, that));
33+
});
34+
}
35+
36+
AMQPServer.prototype.ready = function (cb) {
37+
return this.$promise.then(function () {
38+
if (cb) cb();
39+
})
40+
};
41+
42+
function getRequestHandler(server, mqttserver) {
43+
return function (message) {
44+
var payload = message.payload;
45+
debug('received request', payload);
46+
server.call(payload, function (error, success) {
47+
var response = success;
48+
if (error) {
49+
response = {id: payload.id, jsonrpc: payload.jsonrpc, error: error}
50+
}
51+
var replyTo = message.fields.routingKey + '.reply';
52+
debug('publish reply', replyTo, response);
53+
mqttserver.client.publish(message.fields.exchange, replyTo, response);
54+
});
55+
}
56+
}

lib/mqtt/mqtt-client.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
var debug = require('debug')('jayson:client:mqtt');
1+
var debug = require('debug')('rayson:client:mqtt');
22
var path = require('path');
33
var _ = require('lodash');
44
var NodeCache = require("node-cache");

lib/mqtt/mqtt-server.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"use strict";
22

3-
var debug = require('debug')('jayson:server:mqtt');
3+
var debug = require('debug')('rayson:server:mqtt');
44
var path = require('path');
55
var _ = require('lodash');
66
var MQTTBase = require('./mqtt-base');
@@ -9,7 +9,6 @@ module.exports = MQTTServer;
99

1010
/**
1111
* Constructor for a Jayson MQTT server
12-
* @extends require('net').Server
1312
* @param {jayson.Server} server Server instance
1413
* @param {Object|String} mqttclient The mqtt.js client or connection url or the options.
1514
* @param {Object|String} [options] The topic or options.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
"mosca"
2525
],
2626
"dependencies": {
27-
"amqper": "^0.1.3",
27+
"amqper": "^0.1.7",
2828
"debug": "^2.2.0",
2929
"expirable": "^0.1.0",
3030
"jayson": "^1.2.0",

0 commit comments

Comments
 (0)