Skip to content

Commit 810c2c5

Browse files
Expiration Service created:
- K8s & Docker config - Added redis config - Listener & publisher - Added expiration-queue (bull)
1 parent 38e3f94 commit 810c2c5

File tree

15 files changed

+7190
-0
lines changed

15 files changed

+7190
-0
lines changed

expiration/.dockerignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
node_modules

expiration/Dockerfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM node:alpine
2+
3+
WORKDIR /app
4+
COPY package.json .
5+
RUN npm install --only=prod
6+
COPY . .
7+
8+
CMD [ "npm", "start" ]

expiration/package-lock.json

Lines changed: 6888 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

expiration/package.json

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"name": "expiration",
3+
"version": "1.0.0",
4+
"description": "",
5+
"main": "index.js",
6+
"scripts": {
7+
"start": "ts-node-dev --poll src/index.ts",
8+
"test": "jest --watchAll --no-cache"
9+
},
10+
"jest": {
11+
"preset": "ts-jest",
12+
"testEnvironment": "node",
13+
"setupFilesAfterEnv": [
14+
"./src/test/setup.ts"
15+
]
16+
},
17+
"keywords": [],
18+
"author": "",
19+
"license": "ISC",
20+
"dependencies": {
21+
"@hn-hub/common": "^1.0.6",
22+
"@types/bull": "^3.14.0",
23+
"bull": "^3.15.0",
24+
"node-nats-streaming": "^0.3.2",
25+
"ts-node-dev": "^1.0.0-pre.49",
26+
"typescript": "^3.9.5"
27+
},
28+
"devDependencies": {
29+
"@types/jest": "^26.0.3",
30+
"jest": "^26.1.0",
31+
"ts-jest": "^26.1.1"
32+
}
33+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export const natsWrapper = {
2+
client: {
3+
publish: jest.fn().mockImplementation((subject: string, data: string, callback: () => void) => {
4+
callback();
5+
})
6+
}
7+
};
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export const queueGroupName = 'expiration-service';
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { Listener, StoryCreatedEvent, Subjects } from '@hn-hub/common';
2+
import { Message } from 'node-nats-streaming';
3+
import { expirationQueue } from '../../queues/expiration-queue';
4+
import { queueGroupName } from './queue-group-name';
5+
6+
export class StoryCreatedListener extends Listener<StoryCreatedEvent> {
7+
subject: Subjects.StoryCreated = Subjects.StoryCreated;
8+
queueGroupName = queueGroupName;
9+
10+
async onMessage(data: StoryCreatedEvent['data'], msg: Message) {
11+
const delay = 60000; // 10min delay (600000 ms)
12+
console.log('Waiting this many milliseconds to process the Job: ', delay);
13+
const stories = data.story.map(v => v.storyId);
14+
await expirationQueue.add({
15+
stories
16+
}, {
17+
delay
18+
});
19+
20+
msg.ack();
21+
}
22+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import { ExpirationCompleteEvent, Publisher, Subjects } from '@hn-hub/common';
2+
3+
export class ExpirationCompletePublisher extends Publisher<ExpirationCompleteEvent> {
4+
subject: Subjects.ExpirationComplete = Subjects.ExpirationComplete;
5+
}

expiration/src/index.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
2+
import { StoryCreatedListener } from './events/listeners/story-created-listener';
3+
import { natsWrapper } from './nats-wrapper';
4+
5+
const start = async () => {
6+
console.log('Expiration service starting...');
7+
8+
if (!process.env.NATS_CLIENT_ID) {
9+
throw new Error('NATS_CLIENT_ID must be defined');
10+
}
11+
if (!process.env.NATS_URL) {
12+
throw new Error('NATS_URL must be defined');
13+
}
14+
if (!process.env.NATS_CLUSTER_ID) {
15+
throw new Error('NATS_CLUSTER_ID must be defined');
16+
}
17+
18+
try {
19+
await natsWrapper.connect(
20+
process.env.NATS_CLUSTER_ID,
21+
process.env.NATS_CLIENT_ID,
22+
process.env.NATS_URL
23+
);
24+
25+
natsWrapper.client.on('close', () => {
26+
console.log('Expiration Service : Nats connection closed!');
27+
process.exit();
28+
});
29+
30+
process.on('SIGINT', () => natsWrapper.client.close());
31+
process.on('SIGTERM', () => natsWrapper.client.close());
32+
33+
new StoryCreatedListener(natsWrapper.client).listen();
34+
} catch (err) {
35+
console.error('Startup error => ', err);
36+
}
37+
38+
};
39+
40+
start();

expiration/src/nats-wrapper.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import nats, { Stan } from 'node-nats-streaming';
2+
3+
class NatsWrapper {
4+
private _client?: Stan;
5+
6+
get client() {
7+
if (!this._client) {
8+
throw new Error('Cannot access NATS client before connecting');
9+
}
10+
return this._client;
11+
}
12+
13+
connect(clusterId: string, clientId: string, url: string) {
14+
this._client = nats.connect(clusterId, clientId, { url });
15+
16+
return new Promise((resolve, reject) => {
17+
this.client.on('connect', () => {
18+
console.log('Connected to NATS');
19+
resolve();
20+
});
21+
this.client.on('error', (err) => {
22+
reject(err);
23+
});
24+
});
25+
}
26+
}
27+
28+
export const natsWrapper = new NatsWrapper();

0 commit comments

Comments
 (0)