Skip to content

(feature) Add mutex locking on shared data structures #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 43 additions & 23 deletions ingestors/suricata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import { conns, EVENTS, ALERT, RecordHolderWithTimestamp } from "./interface"
import { program } from "commander"
import { prepareResponse, compileHost, pushAlert } from "./utils"
import ndjson from "ndjson"
import { Mutex, MutexInterface } from "async-mutex"

var server: net.Server
var connections: Record<number, net.Socket> = {}
var http_meta: Record<string, RecordHolderWithTimestamp<conns>> = {}
const httpMutex: MutexInterface = new Mutex()
const alertMutex: MutexInterface = new Mutex()
var alerts: Record<string, RecordHolderWithTimestamp<ALERT>> = {}
const msSendTimeout = 10
// Just offset by enough to not conflict with timeout
Expand Down Expand Up @@ -44,10 +47,16 @@ function createServer(socket: string) {
const flow_id = jsonmsg.flow_id

if (EVENTS.HTTP === (jsonmsg["event_type"] as string)) {
compileHost(jsonmsg, http_meta)
httpMutex.acquire().then((release) => {
compileHost(jsonmsg, http_meta)
release()
})
}
if (EVENTS.ALERT === (jsonmsg["event_type"] as string)) {
alerts[flow_id] = { value: jsonmsg, timestamp: Date.now() }
alertMutex.acquire().then((release) => {
alerts[flow_id] = { value: jsonmsg, timestamp: Date.now() }
release()
})
}
} catch (err) {
console.log(
Expand Down Expand Up @@ -128,34 +137,45 @@ function main() {
}

function processAlerts() {
Object.entries(http_meta).forEach(([k, v]) => {
const flow_id = k
// Find if both events are present in their respective things
if (flow_id in alerts) {

let curr_alert = alerts[flow_id].value
delete alerts[flow_id]
let curr_http = v.value.metas.shift()

// Get first metadata for the given connection.
let resp = prepareResponse(curr_alert, curr_http)
pushAlert(resp, url, api_key)
}
httpMutex.acquire().then((release) => {
Object.entries(http_meta).forEach(([k, v]) => {
const flow_id = k
// Find if both events are present in their respective things
alertMutex.acquire().then((alertRelease) => {
if (flow_id in alerts) {
let curr_alert = alerts[flow_id].value
delete alerts[flow_id]
let curr_http = v.value.metas.shift()

// Get first metadata for the given connection.
let resp = prepareResponse(curr_alert, curr_http)
pushAlert(resp, url, api_key)
}
alertRelease()
})
})
release()
})
}

function cleanup() {
let new_meta = {}
Object.entries(http_meta).filter((([k, v]) => ((Date.now() - v.timestamp) > msCleanupTimeout))).forEach(([k, v]) => { new_meta[k] = v })
http_meta = new_meta
function cleanupData() {
httpMutex.acquire().then((release) => {
let new_meta = {}
Object.entries(http_meta).filter((([k, v]) => ((Date.now() - v.timestamp) > msCleanupTimeout))).forEach(([k, v]) => { new_meta[k] = v })
http_meta = new_meta
release()
})

let new_alerts = {}
Object.entries(alerts).filter((([k, v]) => ((Date.now() - v.timestamp) > msCleanupTimeout))).forEach(([k, v]) => { new_meta[k] = v })
alerts = new_alerts
alertMutex.acquire().then((release) => {
let new_alerts = {}
Object.entries(alerts).filter((([k, v]) => ((Date.now() - v.timestamp) > msCleanupTimeout))).forEach(([k, v]) => { new_alerts[k] = v })
alerts = new_alerts
release()
})
}

setInterval(processAlerts, msSendTimeout)
setInterval(cleanup, msCleanupTimeout)
setInterval(cleanupData, msCleanupTimeout)

process.title = "METLO"

Expand Down
3 changes: 2 additions & 1 deletion ingestors/suricata/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"private": true,
"dependencies": {
"@types/node": "^18.6.5",
"async-mutex": "^0.4.0",
"axios": "^0.27.2",
"commander": "^9.4.0",
"ndjson": "^2.0.0",
Expand All @@ -21,4 +22,4 @@
"build": "tsc",
"format": "prettier --write '**/*.{ts,tsx}'"
}
}
}
12 changes: 12 additions & 0 deletions ingestors/suricata/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ arg@^4.1.0:
resolved "https://registry.yarnpkg.com/arg/-/arg-4.1.3.tgz#269fc7ad5b8e42cb63c896d5666017261c144089"
integrity sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==

async-mutex@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.4.0.tgz#ae8048cd4d04ace94347507504b3cf15e631c25f"
integrity sha512-eJFZ1YhRR8UN8eBLoNzcDPcy/jqjsg6I1AP+KvWQX80BqOSW1oJPJXDylPUEeMr2ZQvHgnQ//Lp6f3RQ1zI7HA==
dependencies:
tslib "^2.4.0"

asynckit@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"
Expand Down Expand Up @@ -228,6 +235,11 @@ tsc@^2.0.4:
resolved "https://registry.yarnpkg.com/tsc/-/tsc-2.0.4.tgz#5f6499146abea5dca4420b451fa4f2f9345238f5"
integrity sha512-fzoSieZI5KKJVBYGvwbVZs/J5za84f2lSTLPYf6AGiIf43tZ3GNrI1QzTLcjtyDDP4aLxd46RTZq1nQxe7+k5Q==

tslib@^2.4.0:
version "2.4.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.4.0.tgz#7cecaa7f073ce680a05847aa77be941098f36dc3"
integrity sha512-d6xOpEDfsi2CZVlPQzGeux8XMwLT9hssAsaPYExaQMuYskwb+x1x7J371tWlbBdWHroy99KnVB6qIkUbs5X3UQ==

typescript@^4.7.4:
version "4.7.4"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.7.4.tgz#1a88596d1cf47d59507a1bcdfb5b9dfe4d488235"
Expand Down