Skip to content

Update event queue to support readahead to FireFly core #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ This will make it possible for the organizations to establish MTLS communication
|message-delivered| Emitted to the sender when a message has been delivered | recipient, message, requestId (optional)
|message-failed | Emitted to the sender when a message could not be delivered| recipient, message, requestId (optional)

- After receiving a websocket message, a commit must be sent in order to receive the next one:
- After receiving a websocket message, an ack must be sent ("commit" is a synonym for "ack"):
```
{ "action": "commit" }
{ "action": "ack", "id": "<ID_FROM_EVENT>" }
```
- Messages arrive in the same order they were sent
- Up to 1,000 messages will be queued
Expand Down
936 changes: 438 additions & 498 deletions package-lock.json

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,40 @@
"author": "",
"license": "Apache-2.0",
"dependencies": {
"ajv": "^8.8.2",
"axios": "^0.24.0",
"busboy": "^0.3.1",
"express": "^4.17.2",
"ajv": "^8.11.0",
"axios": "^0.26.1",
"busboy": "^1.5.0",
"express": "^4.17.3",
"form-data": "^4.0.0",
"jsrsasign": "^10.5.1",
"jsrsasign": "^10.5.14",
"swagger-ui-express": "^4.3.0",
"uuid": "^8.3.2",
"ws": "^8.4.0",
"ws": "^8.5.0",
"yamljs": "^0.3.0"
},
"devDependencies": {
"@types/bunyan": "^1.8.8",
"@types/busboy": "^0.3.1",
"@types/busboy": "^1.5.0",
"@types/chai": "^4.3.0",
"@types/express": "^4.17.13",
"@types/jsrsasign": "^9.0.3",
"@types/mocha": "^9.0.0",
"@types/node": "^17.0.8",
"@types/jsrsasign": "^10.2.1",
"@types/mocha": "^9.1.0",
"@types/node": "^17.0.23",
"@types/swagger-ui-express": "^4.1.3",
"@types/uuid": "^8.3.3",
"@types/ws": "^8.2.2",
"@types/uuid": "^8.3.4",
"@types/ws": "^8.5.3",
"@types/yamljs": "^0.2.31",
"chai": "^4.3.4",
"mocha": "^9.1.3",
"chai": "^4.3.6",
"mocha": "^9.2.2",
"moment": "^2.29.1",
"nyc": "^15.1.0",
"rimraf": "^3.0.2",
"sinon": "^12.0.1",
"sinon": "^13.0.1",
"sinon-chai": "^3.7.0",
"source-map-support": "^0.5.21",
"ts-node": "^10.4.0",
"ts-node": "^10.7.0",
"ts-sinon": "^2.0.2",
"typescript": "^4.5.4"
"typescript": "^4.6.3"
},
"nyc": {
"extension": [
Expand Down
24 changes: 9 additions & 15 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import path from 'path';
import swaggerUi from 'swagger-ui-express';
import WebSocket from 'ws';
import YAML from 'yamljs';
import { eventEmitter as blobsEventEmitter } from './handlers/blobs';
import * as eventsHandler from './handlers/events';
import { eventEmitter as messagesEventEmitter } from './handlers/messages';
import { genTLSContext, init as initCert, loadPeerCAs } from './lib/cert';
import { config, init as initConfig } from './lib/config';
import { IAckEvent } from './lib/interfaces';
import { Logger } from './lib/logger';
import RequestError, { errorHandler } from './lib/request-error';
import * as utils from './lib/utils';
import { router as apiRouter, setRefreshCACerts } from './routers/api';
import { eventEmitter as p2pEventEmitter, router as p2pRouter } from './routers/p2p';
import { router as p2pRouter } from './routers/p2p';
import { init as initEvents } from './handlers/events';

const log = new Logger("app.ts");

Expand All @@ -51,6 +51,7 @@ setRefreshCACerts(refreshCACerts)
export const start = async () => {
await initConfig();
await initCert();
await initEvents(config);

const apiApp = express();
apiServer = http.createServer(apiApp);
Expand All @@ -68,11 +69,7 @@ export const start = async () => {
}
});

p2pEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
blobsEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
messagesEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));

eventsHandler.eventEmitter.addListener('event', event => {
eventsHandler.getEmitter().addListener('event', event => {
log.info(`Event emitted ${event.type}/${event.id}`)
if (delegatedWebSocket !== undefined) {
delegatedWebSocket.send(JSON.stringify(event));
Expand All @@ -82,21 +79,16 @@ export const start = async () => {
const assignWebSocketDelegate = (webSocket: WebSocket) => {
log.info('New WebSocket delegate assigned');
delegatedWebSocket = webSocket;
const event = eventsHandler.getCurrentEvent();
webSocket.on('message', async message => {
try {
const messageContent = JSON.parse(message.toLocaleString());
if (messageContent.action === 'commit') {
log.info(`Event comitted ${event?`${event.type}/${event.id}`:`[no event in flight]`}`)
eventsHandler.handleCommit();
if (messageContent.action === 'ack' || messageContent.action == 'commit') {
eventsHandler.handleAck(messageContent as IAckEvent);
}
} catch (err) {
log.error(`Failed to process websocket message ${err}`);
}
});
if (event !== undefined) {
webSocket.send(JSON.stringify(event));
}
webSocket.on('close', () => {
log.info('WebSocket delegate disconnected');
const nextDelegatedWebSocket = wss.clients.values().next().value;
Expand All @@ -106,6 +98,8 @@ export const start = async () => {
delegatedWebSocket = undefined;
}
});
// Anything that's in-flight needs to be sent again
eventsHandler.reDispatchInFlight();
};

wss.on('connection', (webSocket: WebSocket) => {
Expand Down
7 changes: 3 additions & 4 deletions src/handlers/blobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// limitations under the License.

import crypto from 'crypto';
import EventEmitter from 'events';
import FormData from 'form-data';
import { createReadStream, createWriteStream, promises as fs } from 'fs';
import https from 'https';
Expand All @@ -27,12 +26,12 @@ import { BlobTask, IBlobDeliveredEvent, IBlobFailedEvent, IFile, IMetadata } fro
import { Logger } from '../lib/logger';
import RequestError from '../lib/request-error';
import * as utils from '../lib/utils';
import { queueEvent } from './events';

const log = new Logger("handlers/blobs.ts")

let blobQueue: BlobTask[] = [];
let sending = false;
export const eventEmitter = new EventEmitter();

export const retreiveBlob = async (filePath: string) => {
const resolvedFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.BLOBS_SUBDIRECTORY, filePath);
Expand Down Expand Up @@ -100,7 +99,7 @@ export const deliverBlob = async ({ blobPath, recipient, recipientURL, requestId
timeout: utils.constants.REST_API_CALL_BLOB_REQUEST_TIMEOUT,
httpsAgent
});
eventEmitter.emit('event', {
await queueEvent({
id: uuidV4(),
type: 'blob-delivered',
path: blobPath,
Expand All @@ -109,7 +108,7 @@ export const deliverBlob = async ({ blobPath, recipient, recipientURL, requestId
} as IBlobDeliveredEvent);
log.trace(`Blob delivered`);
} catch (err: any) {
eventEmitter.emit('event', {
await queueEvent({
id: uuidV4(),
type: 'blob-failed',
path: blobPath,
Expand Down
123 changes: 102 additions & 21 deletions src/handlers/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,120 @@
// limitations under the License.

import EventEmitter from "events";
import { OutboundEvent } from "../lib/interfaces";
import { IAckEvent, IConfig, OutboundEvent } from "../lib/interfaces";
import { Logger } from "../lib/logger";
import * as utils from '../lib/utils';

const log = new Logger("handlers/events.ts")

let eventQueue: OutboundEvent[] = [];
export const eventEmitter = new EventEmitter();
let maxInflight = utils.constants.DEFAULT_MAX_INFLIGHT;
let maxEventQueueSize = utils.constants.MAX_EVENT_QUEUE_SIZE;
let eventQueue: OutboundEvent[];
let inFlight: OutboundEvent[];

export const queueEvent = (socketEvent: OutboundEvent) => {
if(eventQueue.length < utils.constants.MAX_EVENT_QUEUE_SIZE) {
eventQueue.push(socketEvent);
if(eventQueue.length === 1) {
eventEmitter.emit('event', eventQueue[0]);
let eventEmitter: EventEmitter;
let unblockPromise: Promise<void> | undefined;
let unblock: (() => void) | undefined;

export const init = async (config: IConfig) => {
eventEmitter = new EventEmitter();
eventQueue = [];
inFlight = [];
unblockPromise = undefined;
unblock = undefined;
if (config.events?.maxInflight !== undefined) {
maxInflight = config.events.maxInflight;
}
if (config.events?.queueSize !== undefined) {
maxEventQueueSize = config.events.queueSize;
}
}

const dispatchNext = () => {
if (inFlight.length < maxInflight) {
const event = eventQueue.shift();
if (event) {
inFlight.push(event)
log.debug(`${event.id}: dispatched`);
eventEmitter.emit('event', event);
}
} else {
log.warn('Max queue size reached');
}
};

export const handleCommit = () => {
eventQueue.shift();
if(eventQueue.length > 0) {
eventEmitter.emit('event', eventQueue[0]);
if (eventQueue.length < maxEventQueueSize && unblock) {
unblock();
unblockPromise = undefined;
unblock = undefined;
log.info(`Event queue unblocked (length=${eventQueue.length})`);
}
}

export const getCurrentEvent = () => {
if(eventQueue.length > 0) {
return eventQueue[0];
export const queueEvent = async (socketEvent: OutboundEvent) => {

let currentUnblockPromise = unblockPromise;
if (currentUnblockPromise) {
let blockedTime = Date.now();
log.warn(`${socketEvent.id}: delaying receive due to full event queue (length=${eventQueue.length})`);
await currentUnblockPromise;
log.info(`${socketEvent.id}: unblocked receive after ${Date.now()-blockedTime}ms`);
}
};

export const getQueueSize = () => {
return eventQueue.length;
eventQueue.push(socketEvent);
if (eventQueue.length >= maxEventQueueSize && !unblockPromise) {
log.warn(`Event queue became full (length=${eventQueue.length})`);
unblockPromise = new Promise(resolve => {
unblock = resolve;
})
}

dispatchNext();
};

export const reDispatchInFlight = () => {
for (const event of inFlight) {
eventEmitter.emit('event', event)
}
}

export const handleAck = (ack: IAckEvent) => {

// Check we have something in-flight
if (inFlight.length <= 0) {
log.error(`Ack for ${ack.id} while no events in-flight`);
return
}

// If no ID supplied (back-level API) we
if (ack.id === undefined) {
log.warn(`FireFly core is back-level and did not supply an event ID`);
ack.id = inFlight[0].id;
}

// Remove from our in-flight map
let event;
for (let i = 0; i < inFlight.length; i++) {
const candidate = inFlight[i]
if (ack.id === candidate.id) {
event = candidate;
inFlight.splice(i, 1);
break;
}
}
if (!event) {
log.warn(`Ack received for ${ack.id} which is not in-flight`);
return
}
log.debug(`${ack.id}: acknowledged`);

dispatchNext();
}

export const getEmitter = () => {
return eventEmitter;
}

export const getStats = () => {
return {
messageQueueSize: eventQueue.length,
inFlightCount: inFlight.length,
}
}
7 changes: 3 additions & 4 deletions src/handlers/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import EventEmitter from 'events';
import FormData from 'form-data';
import https from 'https';
import { v4 as uuidV4 } from 'uuid';
import { ca, cert, key } from '../lib/cert';
import { IMessageDeliveredEvent, IMessageFailedEvent, MessageTask } from '../lib/interfaces';
import { Logger } from '../lib/logger';
import * as utils from '../lib/utils';
import { queueEvent } from './events';

const log = new Logger('handlers/messages.ts')

let messageQueue: MessageTask[] = [];
let sending = false;
export const eventEmitter = new EventEmitter();

export const sendMessage = async (message: string, recipient: string, recipientURL: string, requestId: string | undefined) => {
if (sending) {
Expand Down Expand Up @@ -55,7 +54,7 @@ export const deliverMessage = async ({ message, recipient, recipientURL, request
headers: formData.getHeaders(),
httpsAgent
});
eventEmitter.emit('event', {
await queueEvent({
id: uuidV4(),
type: 'message-delivered',
message,
Expand All @@ -64,7 +63,7 @@ export const deliverMessage = async ({ message, recipient, recipientURL, request
} as IMessageDeliveredEvent);
log.trace(`Message delivered`);
} catch(err: any) {
eventEmitter.emit('event', {
await queueEvent({
id: uuidV4(),
type: 'message-failed',
message,
Expand Down
2 changes: 1 addition & 1 deletion src/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ const loadConfig = async () => {
throw err;
}
}
config = data as IConfig;
if(validateConfig(data)) {
config = data as IConfig;
for(const peer of config.peers) {
if(peer.endpoint.endsWith('/')) {
peer.endpoint = peer.endpoint.slice(-0, -1);
Expand Down
Loading