From b4623c30ae172f48bf73440ae93d85d7b78031e2 Mon Sep 17 00:00:00 2001 From: Ninad Sinha Date: Sat, 22 Oct 2022 01:45:43 +0530 Subject: [PATCH 1/2] Add mutex locking on shared data structures --- ingestors/suricata/index.ts | 66 +++++++++++++++++++++------------ ingestors/suricata/package.json | 3 +- ingestors/suricata/yarn.lock | 12 ++++++ 3 files changed, 57 insertions(+), 24 deletions(-) diff --git a/ingestors/suricata/index.ts b/ingestors/suricata/index.ts index fd483167..29cda0c9 100644 --- a/ingestors/suricata/index.ts +++ b/ingestors/suricata/index.ts @@ -5,10 +5,13 @@ import { conns, EVENTS, ALERT, RecordHolderWithTimestamp } from "./interface" import { program } from "commander" import { prepareResponse, compileHost, pushAlert } from "./utils" import ndjson from "ndjson" +import { Mutex, MutexInterface } from "async-mutex" var server: net.Server var connections: Record = {} var http_meta: Record> = {} +const httpMutex: MutexInterface = new Mutex() +const alertMutex: MutexInterface = new Mutex() var alerts: Record> = {} const msSendTimeout = 10 // Just offset by enough to not conflict with timeout @@ -44,10 +47,16 @@ function createServer(socket: string) { const flow_id = jsonmsg.flow_id if (EVENTS.HTTP === (jsonmsg["event_type"] as string)) { - compileHost(jsonmsg, http_meta) + httpMutex.acquire().then((release) => { + compileHost(jsonmsg, http_meta) + release() + }) } if (EVENTS.ALERT === (jsonmsg["event_type"] as string)) { - alerts[flow_id] = { value: jsonmsg, timestamp: Date.now() } + alertMutex.acquire().then((release) => { + alerts[flow_id] = { value: jsonmsg, timestamp: Date.now() } + release() + }) } } catch (err) { console.log( @@ -128,34 +137,45 @@ function main() { } function processAlerts() { - Object.entries(http_meta).forEach(([k, v]) => { - const flow_id = k - // Find if both events are present in their respective things - if (flow_id in alerts) { - - let curr_alert = alerts[flow_id].value - delete alerts[flow_id] - let curr_http = v.value.metas.shift() - - // Get first metadata for the given connection. - let resp = prepareResponse(curr_alert, curr_http) - pushAlert(resp, url, api_key) - } + httpMutex.acquire().then((release) => { + Object.entries(http_meta).forEach(([k, v]) => { + const flow_id = k + // Find if both events are present in their respective things + alertMutex.acquire().then((alertRelease) => { + if (flow_id in alerts) { + let curr_alert = alerts[flow_id].value + delete alerts[flow_id] + let curr_http = v.value.metas.shift() + + // Get first metadata for the given connection. + let resp = prepareResponse(curr_alert, curr_http) + pushAlert(resp, url, api_key) + alertRelease() + } + }) + }) + release() }) } -function cleanup() { - let new_meta = {} - Object.entries(http_meta).filter((([k, v]) => ((Date.now() - v.timestamp) > msCleanupTimeout))).forEach(([k, v]) => { new_meta[k] = v }) - http_meta = new_meta +function cleanupData() { + httpMutex.acquire().then((release) => { + let new_meta = {} + Object.entries(http_meta).filter((([k, v]) => ((Date.now() - v.timestamp) > msCleanupTimeout))).forEach(([k, v]) => { new_meta[k] = v }) + http_meta = new_meta + release() + }) - let new_alerts = {} - Object.entries(alerts).filter((([k, v]) => ((Date.now() - v.timestamp) > msCleanupTimeout))).forEach(([k, v]) => { new_meta[k] = v }) - alerts = new_alerts + alertMutex.acquire().then((release) => { + let new_alerts = {} + Object.entries(alerts).filter((([k, v]) => ((Date.now() - v.timestamp) > msCleanupTimeout))).forEach(([k, v]) => { new_alerts[k] = v }) + alerts = new_alerts + release() + }) } setInterval(processAlerts, msSendTimeout) -setInterval(cleanup, msCleanupTimeout) +setInterval(cleanupData, msCleanupTimeout) process.title = "METLO" diff --git a/ingestors/suricata/package.json b/ingestors/suricata/package.json index cbf90b4f..8c4ad4ee 100644 --- a/ingestors/suricata/package.json +++ b/ingestors/suricata/package.json @@ -6,6 +6,7 @@ "private": true, "dependencies": { "@types/node": "^18.6.5", + "async-mutex": "^0.4.0", "axios": "^0.27.2", "commander": "^9.4.0", "ndjson": "^2.0.0", @@ -21,4 +22,4 @@ "build": "tsc", "format": "prettier --write '**/*.{ts,tsx}'" } -} \ No newline at end of file +} diff --git a/ingestors/suricata/yarn.lock b/ingestors/suricata/yarn.lock index f8553eef..e8dbe40f 100644 --- a/ingestors/suricata/yarn.lock +++ b/ingestors/suricata/yarn.lock @@ -67,6 +67,13 @@ arg@^4.1.0: resolved "https://registry.yarnpkg.com/arg/-/arg-4.1.3.tgz#269fc7ad5b8e42cb63c896d5666017261c144089" integrity sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA== +async-mutex@^0.4.0: + version "0.4.0" + resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.4.0.tgz#ae8048cd4d04ace94347507504b3cf15e631c25f" + integrity sha512-eJFZ1YhRR8UN8eBLoNzcDPcy/jqjsg6I1AP+KvWQX80BqOSW1oJPJXDylPUEeMr2ZQvHgnQ//Lp6f3RQ1zI7HA== + dependencies: + tslib "^2.4.0" + asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" @@ -228,6 +235,11 @@ tsc@^2.0.4: resolved "https://registry.yarnpkg.com/tsc/-/tsc-2.0.4.tgz#5f6499146abea5dca4420b451fa4f2f9345238f5" integrity sha512-fzoSieZI5KKJVBYGvwbVZs/J5za84f2lSTLPYf6AGiIf43tZ3GNrI1QzTLcjtyDDP4aLxd46RTZq1nQxe7+k5Q== +tslib@^2.4.0: + version "2.4.0" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.4.0.tgz#7cecaa7f073ce680a05847aa77be941098f36dc3" + integrity sha512-d6xOpEDfsi2CZVlPQzGeux8XMwLT9hssAsaPYExaQMuYskwb+x1x7J371tWlbBdWHroy99KnVB6qIkUbs5X3UQ== + typescript@^4.7.4: version "4.7.4" resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.7.4.tgz#1a88596d1cf47d59507a1bcdfb5b9dfe4d488235" From 620b11193a252fba16e42129b39a483fa9976a28 Mon Sep 17 00:00:00 2001 From: Ninad Sinha Date: Sat, 22 Oct 2022 01:50:13 +0530 Subject: [PATCH 2/2] release mutex lock outside of if statement --- ingestors/suricata/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestors/suricata/index.ts b/ingestors/suricata/index.ts index 29cda0c9..e0ee9ccd 100644 --- a/ingestors/suricata/index.ts +++ b/ingestors/suricata/index.ts @@ -53,7 +53,7 @@ function createServer(socket: string) { }) } if (EVENTS.ALERT === (jsonmsg["event_type"] as string)) { - alertMutex.acquire().then((release) => { + alertMutex.acquire().then((release) => { alerts[flow_id] = { value: jsonmsg, timestamp: Date.now() } release() }) @@ -150,8 +150,8 @@ function processAlerts() { // Get first metadata for the given connection. let resp = prepareResponse(curr_alert, curr_http) pushAlert(resp, url, api_key) - alertRelease() } + alertRelease() }) }) release()