Skip to content

Commit 4ac9fb9

Browse files
authored
Merge pull request #19 from piercus/issue-16
Redesign
2 parents 0c3f2d9 + c549118 commit 4ac9fb9

File tree

6 files changed

+607
-190
lines changed

6 files changed

+607
-190
lines changed

README.md

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
[![codecov](https://codecov.io/gh/piercus/step-function-worker/branch/master/graph/badge.svg)](https://codecov.io/gh/piercus/step-function-worker)
44

55
# step-function-worker
6+
67
Create a nodejs aws step-function worker/pooler easily :-)
78

89
## install
@@ -16,32 +17,48 @@ npm install step-function-worker
1617
#### Basic example
1718

1819
```javascript
19-
var fn = function(input, cb, heartbeat){
20+
const fn = function(input, cb, heartbeat){
2021
// do something
2122
doSomething(input)
2223

23-
// call heartbeat sometime to avoid timeout
24+
// call heartbeat to avoid timeout
2425
heartbeat()
2526

2627
// call callback in the end
2728
cb(null, {"foo" : "bar"}); // output must be compatible with JSON.stringify
2829
};
2930

30-
var worker = new StepFunctionWorker({
31+
const worker = new StepFunctionWorker({
3132
activityArn : '<activity-ARN>',
3233
workerName : 'workerName',
3334
fn : fn,
34-
concurrency : 2 // default is 1
35+
taskConcurrency : 22, // default is null = Infinity
36+
poolConcurrency : 2 // default is 1
3537
});
3638
```
39+
40+
### Concurrency management
41+
42+
Since version **3.0**, `concurrency` has been replaced by `poolConcurrency` and `taskConcurrency`.
43+
44+
* `taskConcurrency` (`null` means Infinite)
45+
46+
It represent the maximum number of parallel tasks done by the worker (default: `null`).
47+
48+
* `poolConcurrency` is the maximum number of parallel getActivity, http request (see [`sdk.getActivity`](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/StepFunctions.html#getActivityTask-property)) (default: `1`)
49+
50+
Increase this to have a more responsive worker.
51+
52+
Anyway, you should always have `poolConcurrency` < `taskConcurrency`.
53+
3754
#### Set the Region
3855

3956
By default, this package is built on top of `aws-sdk` so you should set your AWS Region by changing `AWS_REGION` environment variable.
4057

4158
If you want to set it in JS code directly you can do it using `awsConfig` (see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html to see all available options) like
4259

43-
```
44-
var worker = new StepFunctionWorker({
60+
```javascript
61+
const worker = new StepFunctionWorker({
4562
activityArn : '<activity-ARN>',
4663
workerName : 'workerName',
4764
fn : fn,
@@ -61,36 +78,114 @@ worker.close(function(){
6178
})
6279
```
6380

81+
#### Get info on current worker
82+
83+
```javascript
84+
// A worker as multiple poolers and multiple running tasks
85+
// You can have infos about it by doing
86+
const {poolers, tasks} = worker.report();
87+
88+
// poolers is an array of {
89+
// startTime: <Date>,
90+
// workerName: <String>,
91+
// status: <String>
92+
// }
93+
//
94+
// tasks is an array of {
95+
// taskToken: <String>,
96+
// input: <Object>,
97+
// startTime: <Date>
98+
// }
99+
//
100+
```
101+
102+
#### Custom logging with winston
103+
104+
You can customize logging by using a [winston](https://www.npmjs.com/package/winston) logger (or winston-like logger) as input
105+
106+
```javascript
107+
const winston = require('winston');
108+
109+
const logger = winston.createLogger({
110+
level: 'debug',
111+
format: winston.format.json(),
112+
defaultMeta: { service: 'user-service' },
113+
transports: [
114+
//
115+
// - Write to all logs with level `info` and below to `combined.log`
116+
// - Write all logs error (and below) to `error.log`.
117+
//
118+
new winston.transports.File({ filename: 'error.log', level: 'error' }),
119+
new winston.transports.File({ filename: 'combined.log' })
120+
]
121+
});
122+
123+
const worker = new StepFunctionWorker({
124+
activityArn : '<activity-ARN>',
125+
workerName : 'workerName',
126+
fn : fn,
127+
logger
128+
});
129+
```
130+
131+
Alternatively, you can just use a winston-like logger
132+
133+
```javascript
134+
const logger = console;
135+
136+
const worker = new StepFunctionWorker({
137+
activityArn : '<activity-ARN>',
138+
workerName : 'workerName',
139+
fn : fn,
140+
logger
141+
});
142+
```
143+
64144
#### Events
65145

66146

67147
```javascript
148+
// when a task starts
68149
worker.on('task', function(task){
69150
// task.taskToken
70151
// task.input
71152
console.log("task ", task.input)
72153
});
73154

155+
// when a task fails
74156
worker.on('failure', function(failure){
75157
// out.error
76158
// out.taskToken
77159
console.log("Failure :",failure.error)
78160
});
79161

162+
// when a heartbeat signal is sent
80163
worker.on('heartbeat', function(beat){
81164
// out.taskToken
82165
console.log("Heartbeat");
83166
});
84167

168+
// when a task succeed
85169
worker.on('success', function(out){
86170
// out.output
87171
// out.taskToken
88172
console.log("Success :",out.output)
89173
});
90174

175+
// when an error happens
91176
worker.on('error', function(err){
92177
console.log("error ", err)
93178
});
179+
180+
// when the worker has no more task to process
181+
worker.on('empty', function(){
182+
console.log("error ", err)
183+
});
184+
185+
// when the worker reaches taskConcurrency tasks
186+
worker.on('full', function(err){
187+
console.log("error ", err)
188+
});
94189
```
95190

96191
### Documentation

lib/pooler.js

Lines changed: 67 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
const util = require('util');
2-
const {EventEmitter} = require('events');
3-
const Task = require('./task.js');
1+
const crypto = require('crypto');
42

53
/**
64
* @class Pooler
@@ -11,35 +9,27 @@ const Task = require('./task.js');
119
* */
1210

1311
function Pooler(options) {
14-
EventEmitter.call(this);
15-
16-
this._running = true;
17-
this._task = false;
12+
this.id = crypto.randomBytes(3).toString('hex');
1813
this.logger = options.logger;
14+
this.startTime = new Date();
1915
this.activityArn = options.activityArn;
2016
this.worker = options.worker;
2117
this.index = options.index;
2218
this.workerName = options.workerName && (options.workerName + '-' + this.index);
23-
this._request = null;
24-
this.pool();
19+
this.logger.debug(`new pooler ${this.id}`);
20+
this.getActivityTask();
2521
}
2622

27-
Pooler.prototype.stop = function (cb) {
28-
this._running = false;
29-
if (this._task) {
30-
this._task.removeAllListeners();
31-
}
23+
Pooler.prototype.stop = function () {
24+
this.logger.debug(`Pooler (${this.id}): Stop`);
3225

33-
if (this._request) {
34-
this.on('stopPooling', () => {
35-
this.removeAllListeners();
36-
cb();
26+
if (!this._stoppingPromise) {
27+
this._stoppingPromise = (this._requestPromise || Promise.resolve()).then(() => {
28+
this._stopped = true;
3729
});
38-
// This would be better approach but it does not seem to work
39-
// this._request.abort();
40-
} else {
41-
cb();
4230
}
31+
32+
return this._stoppingPromise;
4333
};
4434

4535
/**
@@ -54,68 +44,74 @@ Pooler.prototype.stop = function (cb) {
5444
*/
5545
Pooler.prototype.report = function () {
5646
return {
57-
workerName: this.workerName,
58-
status: (this._task ? 'Task under going' : (this._running ? 'Waiting for Tasks' : 'Paused')),
59-
task: this._task && this._task.report()
47+
id: this.id,
48+
startTime: this.startTime,
49+
status: (this._stopped ? 'Stopped' : 'Running')
6050
};
6151
};
6252

6353
Pooler.prototype.restart = function () {
64-
this._running = true;
65-
this.pool();
54+
return this.stop().then(() => {
55+
this._stopped = false;
56+
this.getActivityTask();
57+
return Promise.resolve();
58+
});
6659
};
6760

68-
Pooler.prototype.pool = function () {
69-
if (this._running) {
70-
if (this._task) {
71-
throw (new Error('pool should not be called when task on going'));
72-
}
73-
74-
if (this._request) {
75-
throw (new Error('pool should not be called when request on going'));
76-
}
61+
Pooler.prototype.getActivityTask = function () {
62+
// This.logger.info('getActivityTask');
7763

78-
this.getActivityTask();
79-
} else {
80-
this.emit('stopPooling');
64+
// this.logger.debug(this.workerName + ' getActivityTask ' + this.activityArn);
65+
if (this._stopped) {
66+
return Promise.reject(new Error(`Pooler (${this.id}) is stopped`));
8167
}
82-
};
8368

84-
Pooler.prototype.getActivityTask = function () {
85-
this.logger.debug(this.workerName + ' getActivityTask ' + this.activityArn);
86-
this._request = this.worker.stepfunction.getActivityTask({
87-
activityArn: this.activityArn,
88-
workerName: this.workerName
89-
}, (err, data) => {
90-
this._request = null;
91-
if (err) {
69+
if (!this._requestPromise) {
70+
this.logger.debug(`Pooler (${this.id}): getActivityTask`);
71+
72+
this._requestPromise = this.worker.stepfunction.getActivityTask({
73+
activityArn: this.activityArn,
74+
workerName: this.workerName
75+
}).promise()
76+
.then(data => {
77+
if (data.taskToken && typeof (data.taskToken) === 'string' && data.taskToken.length > 1) {
78+
this.logger.debug(`Pooler (${this.id}): Activity task received (${data.taskToken.slice(0, 10)})`);
79+
const params = Object.assign({}, data, {
80+
input: JSON.parse(data.input),
81+
workerName: this.workerName,
82+
poolerId: this.id
83+
});
84+
return this.worker.addTask(params);
85+
}
86+
87+
this.logger.debug(`Pooler (${this.id}): No activity task received`);
88+
return Promise.resolve();
89+
})
90+
.then(() => {
91+
this._requestPromise = null;
92+
const renewal = this.worker.renewPooler(this);
93+
if (!renewal) {
94+
this.stop();
95+
this.worker.removePooler(this);
96+
return Promise.resolve();
97+
}
98+
99+
return this.getActivityTask();
100+
})
101+
.catch(error => {
92102
// Console.log(err);
93-
if (err.code === 'RequestAbortedError') {
103+
this.logger.error(`Pooler (${this.id}):`, error);
104+
if (error.code === 'RequestAbortedError') {
94105
// In case of abort, close silently
95-
} else {
96-
this.emit('error', err);
97-
}
98-
99-
return;
100-
}
101-
102-
if (data.taskToken && typeof (data.taskToken) === 'string' && data.taskToken.length > 1) {
103-
const params = Object.assign({}, data, {input: JSON.parse(data.input), workerName: this.workerName});
104-
105-
this.worker.emit('task', params);
106-
107-
this._task = new Task(Object.assign({}, params, {worker: this.worker, logger: this.logger}));
106+
} else {
107+
this.worker.emit('error', error);
108+
}
108109

109-
this._task.once('finish', () => {
110-
this._task = null;
111-
this.pool();
110+
// Return Promise.reject(err);
112111
});
113-
} else {
114-
this.pool();
115-
}
116-
});
117-
};
112+
}
118113

119-
util.inherits(Pooler, EventEmitter);
114+
return this._requestPromise;
115+
};
120116

121117
module.exports = Pooler;

0 commit comments

Comments
 (0)