Skip to content

Commit 0a15b89

Browse files
committed
fix: sync functions and sync errors
1 parent bffbd6e commit 0a15b89

File tree

3 files changed

+175
-67
lines changed

3 files changed

+175
-67
lines changed

lib/worker.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,13 @@ Worker.prototype.close = function (cb) {
131131
};
132132

133133
Worker.prototype.execute = function (input, cb, heartbeat) {
134-
this.fn(input, cb, heartbeat);
134+
setImmediate(() => {
135+
try {
136+
this.fn(input, cb, heartbeat);
137+
} catch (err) {
138+
cb(err);
139+
}
140+
});
135141
};
136142

137143
Worker.prototype.succeed = function (res) {

package-lock.json

Lines changed: 45 additions & 66 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/scenarios/sync-fn.js

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
const test = require('ava').test;
2+
const AWS = require('aws-sdk');
3+
const StepFunctionWorker = require('../../index.js');
4+
const createActivity = require('../utils/create-activity');
5+
const cleanUp = require('../utils/clean-up');
6+
7+
const stepFunction = new AWS.StepFunctions();
8+
const workerName = 'test worker name';
9+
const stateMachineName = 'test-state-machine-' + Math.floor(Math.random() * 1000);
10+
const activityName = 'test-step-function-worker-' + Math.floor(Math.random() * 1000);
11+
12+
process.on('uncaughtException', err => {
13+
console.log('uncaughtException', err);
14+
});
15+
/*
16+
{
17+
definition: '{"Comment":"An Example State machine using Activity.","StartAt":"FirstState","States":{"FirstState":{"Type":"Task","Resource":"arn:aws:states:eu-central-1:170670752151:activity:test-step-function-worker","TimeoutSeconds":300,"HeartbeatSeconds":60,"Next":"End"}}}',
18+
name: 'test-state-machine',
19+
roleArn: 'arn:aws:iam::170670752151:role/service-role/StatesExecutionRole-eu-central-1'
20+
}
21+
*/
22+
23+
const context = {};
24+
25+
const before = createActivity.bind(null, {context, activityName, stateMachineName, workerName});
26+
const after = cleanUp.bind(null, context);
27+
28+
const sentInput = {foo: 'bar'};
29+
const sentOutput = {foo2: 'bar2'};
30+
31+
const fn = function (event, callback) {
32+
callback(null, sentOutput);
33+
};
34+
35+
const fnError = function () {
36+
throw (new Error('custom error'));
37+
};
38+
39+
test.before(before);
40+
41+
test.serial('Step function Activity Worker with 2 consecutive synchronous tasks', t => {
42+
const activityArn = context.activityArn;
43+
const stateMachineArn = context.stateMachineArn;
44+
45+
const worker = new StepFunctionWorker({
46+
activityArn,
47+
workerName: workerName + '-fn',
48+
fn
49+
});
50+
51+
return new Promise((resolve, reject) => {
52+
let expectedTaskToken;
53+
const params = {
54+
stateMachineArn,
55+
input: JSON.stringify(sentInput)
56+
};
57+
worker.once('task', task => {
58+
// Task.taskToken
59+
// task.input
60+
t.deepEqual(task.input, sentInput);
61+
t.is(typeof (task.taskToken), 'string');
62+
expectedTaskToken = task.taskToken;
63+
});
64+
worker.on('error', reject);
65+
worker.once('success', out => {
66+
t.is(out.taskToken, expectedTaskToken);
67+
68+
let expectedTaskToken2;
69+
worker.once('task', task => {
70+
// Task.taskToken
71+
// task.input
72+
expectedTaskToken2 = task.taskToken;
73+
});
74+
75+
worker.once('success', out => {
76+
t.is(out.taskToken, expectedTaskToken2);
77+
worker.close(() => {
78+
resolve();
79+
});
80+
});
81+
82+
stepFunction.startExecution(params).promise();
83+
});
84+
85+
stepFunction.startExecution(params).promise();
86+
});
87+
});
88+
89+
test.serial('Step function Activity Worker with synchronous failing task', t => {
90+
const activityArn = context.activityArn;
91+
const stateMachineArn = context.stateMachineArn;
92+
93+
const worker = new StepFunctionWorker({
94+
activityArn,
95+
workerName: workerName + '-fn',
96+
fn: fnError
97+
});
98+
99+
return new Promise((resolve, reject) => {
100+
let expectedTaskToken;
101+
const params = {
102+
stateMachineArn,
103+
input: JSON.stringify(sentInput)
104+
};
105+
worker.once('task', task => {
106+
// Task.taskToken
107+
// task.input
108+
t.deepEqual(task.input, sentInput);
109+
t.is(typeof (task.taskToken), 'string');
110+
expectedTaskToken = task.taskToken;
111+
});
112+
worker.once('failure', out => {
113+
t.is(out.taskToken, expectedTaskToken);
114+
t.is(out.error.message, 'custom error');
115+
worker.close(() => {
116+
resolve();
117+
});
118+
});
119+
worker.once('success', reject);
120+
stepFunction.startExecution(params).promise();
121+
});
122+
});
123+
test.after(after);

0 commit comments

Comments
 (0)