Skip to content

Commit b6522b8

Browse files
committed
(features) Add basic express ingestor
1 parent faec3ea commit b6522b8

File tree

5 files changed

+840
-0
lines changed

5 files changed

+840
-0
lines changed

ingestors/express/package.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"name": "@metlo/express-ingest",
3+
"description": "An ingestor for metlo api-security to support the express server",
4+
"version": "1.0.0",
5+
"main": "index.js",
6+
"license": "MIT",
7+
"scripts": {},
8+
"dependencies": {
9+
"express": "^4.0.0",
10+
"shimmer": "^1.2.1"
11+
},
12+
"repository": {
13+
"type": "git",
14+
"url": "https://github.com/metlo-labs/metlo.git"
15+
},
16+
"engines": {
17+
"node": ">=11.7.0"
18+
},
19+
"homepage": "www.metlo.com"
20+
}

ingestors/express/src/index.js

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
var shimmer = require('shimmer');
2+
var expr_resp = require('express/lib/response');
3+
var http = require('https')
4+
var os = require('node:os');
5+
6+
var WorkerPool = require('./pool');
7+
8+
9+
const pool = new WorkerPool(os.cpus().length, './workerTarget.js');
10+
11+
function exit() {
12+
pool.close()
13+
}
14+
15+
process.on('exit', exit);
16+
process.on('SIGTERM', exit);
17+
18+
19+
var metloDetails = {
20+
key: null,
21+
host: null
22+
}
23+
24+
25+
function compileInformation(_req, _res, responseBody) {
26+
const data = JSON.stringify({
27+
request: {
28+
url: {
29+
host: _req.hostname,
30+
path: _req.route.path,
31+
parameters: _req.query,
32+
},
33+
headers: _req.headers,
34+
body: _req.body || "No Body",
35+
method: _req.method,
36+
},
37+
response: {
38+
url: `${_req.socket.remoteAddress}:${_req.socket.remotePort}`,
39+
status: _res.statusCode,
40+
headers: _res.getHeaders(),
41+
body: responseBody,
42+
},
43+
meta: {
44+
environment: "production",
45+
incoming: true,
46+
source: _req.socket.remoteAddress,
47+
sourcePort: _req.socket.remotePort,
48+
// TODO : Add destination
49+
destination: "server.hostname",
50+
destinationPort: "server.port",
51+
}
52+
})
53+
54+
pool.runTask({ host: metloDetails.host, key: metloDetails.key, data }, (err, result) => {
55+
console.log(err, result);
56+
})
57+
}
58+
59+
shimmer.wrap(expr_resp, 'send', function (original) {
60+
return function () {
61+
var returned = original.apply(this, arguments)
62+
compileInformation(this.req, returned, arguments[arguments.length - 1])
63+
return returned;
64+
};
65+
});
66+
67+
shimmer.wrap(expr_resp, 'sendFile', function (original) {
68+
return function () {
69+
var returned = original.apply(this, arguments)
70+
compileInformation(this.req, returned, JSON.stringify(arguments[arguments.length - 1]))
71+
return returned;
72+
};
73+
});
74+
75+
module.exports = function (apiKey, metloHost) {
76+
metloDetails.host = metloHost
77+
metloDetails.key = apiKey
78+
}

ingestors/express/src/pool/index.js

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
const { AsyncResource } = require('node:async_hooks');
2+
const { EventEmitter } = require('node:events');
3+
const path = require('node:path');
4+
const { Worker } = require('node:worker_threads');
5+
6+
const kTaskInfo = Symbol('kTaskInfo');
7+
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
8+
9+
class WorkerPoolTaskInfo extends AsyncResource {
10+
constructor(callback) {
11+
super('WorkerPoolTaskInfo');
12+
this.callback = callback;
13+
}
14+
15+
done(err, result) {
16+
this.runInAsyncScope(this.callback, null, err, result);
17+
this.emitDestroy(); // `TaskInfo`s are used only once.
18+
}
19+
}
20+
21+
class WorkerPool extends EventEmitter {
22+
constructor(numThreads, fileName) {
23+
super();
24+
this.numThreads = numThreads;
25+
this.workers = [];
26+
this.freeWorkers = [];
27+
this.tasks = [];
28+
this.workerFileName = fileName
29+
30+
for (let i = 0; i < numThreads; i++)
31+
this.addNewWorker();
32+
33+
// Any time the kWorkerFreedEvent is emitted, dispatch
34+
// the next task pending in the queue, if any.
35+
this.on(kWorkerFreedEvent, () => {
36+
if (this.tasks.length > 0) {
37+
const { task, callback } = this.tasks.shift();
38+
this.runTask(task, callback);
39+
}
40+
});
41+
}
42+
43+
addNewWorker() {
44+
const worker = new Worker(path.resolve(__dirname, this.workerFileName));
45+
worker.on('message', (result) => {
46+
// In case of success: Call the callback that was passed to `runTask`,
47+
// remove the `TaskInfo` associated with the Worker, and mark it as free
48+
// again.
49+
worker[kTaskInfo].done(null, result);
50+
worker[kTaskInfo] = null;
51+
this.freeWorkers.push(worker);
52+
this.emit(kWorkerFreedEvent);
53+
});
54+
worker.on('error', (err) => {
55+
// In case of an uncaught exception: Call the callback that was passed to
56+
// `runTask` with the error.
57+
if (worker[kTaskInfo])
58+
worker[kTaskInfo].done(err, null);
59+
else
60+
this.emit('error', err);
61+
// Remove the worker from the list and start a new Worker to replace the
62+
// current one.
63+
this.workers.splice(this.workers.indexOf(worker), 1);
64+
this.addNewWorker();
65+
});
66+
this.workers.push(worker);
67+
this.freeWorkers.push(worker);
68+
this.emit(kWorkerFreedEvent);
69+
}
70+
71+
runTask(task, callback) {
72+
if (this.freeWorkers.length === 0) {
73+
// No free threads, wait until a worker thread becomes free.
74+
this.tasks.push({ task, callback });
75+
return;
76+
}
77+
78+
const worker = this.freeWorkers.pop();
79+
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
80+
worker.postMessage(task);
81+
}
82+
83+
close() {
84+
for (const worker of this.workers) worker.terminate();
85+
}
86+
}
87+
88+
module.exports = WorkerPool;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
const { parentPort } = require('node:worker_threads');
2+
const https = require('https')
3+
parentPort.on('message', ({ data: postData, host, key }) => {
4+
console.log(postData)
5+
var _resp = https.request(host, {
6+
method: "POST", headers: {
7+
'Content-Type': 'application/json',
8+
'Content-Length': Buffer.byteLength(postData),
9+
'Authorization': key
10+
}
11+
}, (res) => {
12+
let data = '';
13+
14+
console.log('Status Code:', res.statusCode);
15+
16+
res.on('data', (chunk) => {
17+
data += chunk;
18+
});
19+
20+
res.on('end', () => {
21+
console.log('Body: ', JSON.parse(data));
22+
});
23+
24+
}).on("error", (err) => {
25+
console.log("Error: ", err.message);
26+
})
27+
28+
_resp.write(postData)
29+
_resp.end();
30+
parentPort.postMessage("Sent Message");
31+
});

0 commit comments

Comments
 (0)