Skip to content

Commit e3c3019

Browse files
authored
feature(optimize-collector) (#48)
1 parent 786710f commit e3c3019

30 files changed

+890
-372
lines changed

backend/package.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111
"dev": "NODE_ENV=development nodemon -r tsconfig-paths/register src/index.ts",
1212
"dev-jobs": "NODE_ENV=development nodemon -r tsconfig-paths/register src/jobs.ts",
1313
"dev-collector": "NODE_ENV=development nodemon -r tsconfig-paths/register src/collector.ts",
14+
"dev-analyzer": "NODE_ENV=development nodemon -r tsconfig-paths/register src/analyzer.ts",
1415
"sync-endpoints": "NODE_ENV=development ts-node -r tsconfig-paths/register src/scripts/generate-endpoints.ts",
1516
"generate-attacks": "NODE_ENV=development ts-node -r tsconfig-paths/register src/scripts/generate-attacks.ts",
1617
"generate-alerts": "NODE_ENV=development ts-node -r tsconfig-paths/register src/scripts/generate-alerts.ts",
1718
"start": "TS_NODE_BASEURL=./dist node -r tsconfig-paths/register dist/index.js",
1819
"start-jobs": "TS_NODE_BASEURL=./dist node -r tsconfig-paths/register dist/jobs.js --max-old-space-size=2048",
1920
"start-collector": "TS_NODE_BASEURL=./dist/ node -r tsconfig-paths/register dist/collector.js",
21+
"start-analyzer": "TS_NODE_BASEURL=./dist/ node -r tsconfig-paths/register dist/analyzer.js",
2022
"format": "prettier --write './src/**/*.{ts,tsx}'",
21-
"migration:create": "typeorm migration:create"
23+
"migration:create": "typeorm migration:create",
24+
"migration:run": "ts-node -r tsconfig-paths/register ./node_modules/.bin/typeorm migration:run -d src/data-source.ts"
2225
},
2326
"keywords": [],
2427
"author": "",
@@ -38,6 +41,7 @@
3841
"aws-sdk": "^2.1189.0",
3942
"axios": "^0.27.2",
4043
"body-parser": "^1.20.0",
44+
"cluster": "^0.7.7",
4145
"connect-typeorm": "^2.0.0",
4246
"dotenv": "^16.0.1",
4347
"express": "^4.18.1",
@@ -57,6 +61,7 @@
5761
"parse-multipart-data": "^1.4.0",
5862
"pg": "^8.7.3",
5963
"pg-protocol": "^1.5.0",
64+
"piscina": "^3.2.0",
6065
"reflect-metadata": "^0.1.13",
6166
"semaphore": "^1.1.0",
6267
"swagger2openapi": "^7.0.8",

backend/src/analyze-traces.ts

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
import { v4 as uuidv4 } from "uuid"
2+
import { AppDataSource } from "data-source"
3+
import { ApiTrace, ApiEndpoint, DataField, Alert } from "models"
4+
import { DataFieldService } from "services/data-field"
5+
import { SpecService } from "services/spec"
6+
import { AlertService } from "services/alert"
7+
import { DatabaseService } from "services/database"
8+
import { RedisClient } from "utils/redis"
9+
import { TRACES_QUEUE } from "~/constants"
10+
import { QueryRunner, Raw } from "typeorm"
11+
import { QueuedApiTrace } from "@common/types"
12+
import { isSuspectedParamater, skipAutoGeneratedMatch } from "utils"
13+
import { getPathTokens } from "@common/utils"
14+
import { AlertType } from "@common/enums"
15+
16+
const getQueuedApiTrace = async (): Promise<QueuedApiTrace> => {
17+
try {
18+
const traceString = await RedisClient.popValueFromRedisList(TRACES_QUEUE)
19+
return JSON.parse(traceString)
20+
} catch (err) {
21+
return null
22+
}
23+
}
24+
25+
const analyze = async (
26+
trace: QueuedApiTrace,
27+
apiEndpoint: ApiEndpoint,
28+
queryRunner: QueryRunner,
29+
newEndpoint?: boolean,
30+
) => {
31+
apiEndpoint.updateDates(trace.createdAt)
32+
const dataFields = DataFieldService.findAllDataFields(trace, apiEndpoint)
33+
let alerts = await SpecService.findOpenApiSpecDiff(
34+
trace,
35+
apiEndpoint,
36+
queryRunner,
37+
)
38+
const sensitiveDataAlerts = await AlertService.createDataFieldAlerts(
39+
dataFields,
40+
apiEndpoint.uuid,
41+
apiEndpoint.path,
42+
trace,
43+
queryRunner,
44+
)
45+
alerts = alerts?.concat(sensitiveDataAlerts)
46+
if (newEndpoint) {
47+
const newEndpointAlert = await AlertService.createAlert(
48+
AlertType.NEW_ENDPOINT,
49+
apiEndpoint,
50+
)
51+
alerts = alerts?.concat(newEndpointAlert)
52+
}
53+
54+
await queryRunner.startTransaction()
55+
await DatabaseService.retryTypeormTransaction(
56+
() =>
57+
queryRunner.manager.insert(ApiTrace, {
58+
...trace,
59+
apiEndpointUuid: apiEndpoint.uuid,
60+
}),
61+
5,
62+
)
63+
await DatabaseService.retryTypeormTransaction(
64+
() =>
65+
queryRunner.manager
66+
.createQueryBuilder()
67+
.insert()
68+
.into(DataField)
69+
.values(dataFields)
70+
.orUpdate(
71+
[
72+
"dataClasses",
73+
"scannerIdentified",
74+
"dataType",
75+
"dataTag",
76+
"matches",
77+
],
78+
["dataSection", "dataPath", "apiEndpointUuid"],
79+
)
80+
.execute(),
81+
5,
82+
)
83+
await DatabaseService.retryTypeormTransaction(
84+
() =>
85+
queryRunner.manager
86+
.createQueryBuilder()
87+
.insert()
88+
.into(Alert)
89+
.values(alerts)
90+
.orIgnore()
91+
.execute(),
92+
5,
93+
)
94+
await DatabaseService.retryTypeormTransaction(
95+
() =>
96+
queryRunner.manager
97+
.createQueryBuilder()
98+
.update(ApiEndpoint)
99+
.set({
100+
firstDetected: apiEndpoint.firstDetected,
101+
lastActive: apiEndpoint.lastActive,
102+
riskScore: apiEndpoint.riskScore,
103+
})
104+
.where("uuid = :id", { id: apiEndpoint.uuid })
105+
.execute(),
106+
5,
107+
)
108+
await queryRunner.commitTransaction()
109+
}
110+
111+
const generateEndpoint = async (
112+
trace: QueuedApiTrace,
113+
queryRunner: QueryRunner,
114+
): Promise<void> => {
115+
const pathTokens = getPathTokens(trace.path)
116+
let paramNum = 1
117+
let parameterizedPath = ""
118+
let pathRegex = String.raw``
119+
for (let j = 0; j < pathTokens.length; j++) {
120+
const tokenString = pathTokens[j]
121+
if (tokenString === "/") {
122+
parameterizedPath += "/"
123+
pathRegex += "/"
124+
} else if (tokenString.length > 0) {
125+
if (isSuspectedParamater(tokenString)) {
126+
parameterizedPath += `/{param${paramNum}}`
127+
pathRegex += String.raw`/[^/]+`
128+
paramNum += 1
129+
} else {
130+
parameterizedPath += `/${tokenString}`
131+
pathRegex += String.raw`/${tokenString}`
132+
}
133+
}
134+
}
135+
if (pathRegex.length > 0) {
136+
pathRegex = String.raw`^${pathRegex}(/)*$`
137+
const apiEndpoint = new ApiEndpoint()
138+
apiEndpoint.uuid = uuidv4()
139+
apiEndpoint.path = parameterizedPath
140+
apiEndpoint.pathRegex = pathRegex
141+
apiEndpoint.host = trace.host
142+
apiEndpoint.method = trace.method
143+
apiEndpoint.addNumberParams()
144+
apiEndpoint.dataFields = []
145+
146+
try {
147+
await queryRunner.startTransaction()
148+
await DatabaseService.retryTypeormTransaction(
149+
() =>
150+
queryRunner.manager
151+
.createQueryBuilder()
152+
.insert()
153+
.into(ApiEndpoint)
154+
.values(apiEndpoint)
155+
.execute(),
156+
5,
157+
)
158+
await queryRunner.commitTransaction()
159+
await analyze(trace, apiEndpoint, queryRunner, true)
160+
} catch (err) {
161+
if (queryRunner.isTransactionActive) {
162+
await queryRunner.rollbackTransaction()
163+
}
164+
if (DatabaseService.isQueryFailedError(err) && err.code === "23505") {
165+
const existingEndpoint = await queryRunner.manager.findOne(
166+
ApiEndpoint,
167+
{
168+
where: {
169+
path: trace.path,
170+
host: trace.host,
171+
method: trace.method,
172+
},
173+
relations: { dataFields: true },
174+
},
175+
)
176+
if (existingEndpoint) {
177+
await analyze(trace, existingEndpoint, queryRunner)
178+
}
179+
} else {
180+
console.error(`Error generating new endpoint: ${err}`)
181+
await queryRunner.rollbackTransaction()
182+
}
183+
}
184+
}
185+
}
186+
187+
const analyzeTraces = async (): Promise<void> => {
188+
const datasource = await AppDataSource.initialize()
189+
if (!datasource.isInitialized) {
190+
console.error("Couldn't initialize datasource...")
191+
return
192+
}
193+
console.log("AppDataSource Initialized...")
194+
console.log("Running Analyzer...")
195+
let queryRunner = AppDataSource.createQueryRunner()
196+
await queryRunner.connect()
197+
while (true) {
198+
try {
199+
const trace = await getQueuedApiTrace()
200+
if (trace) {
201+
trace.createdAt = new Date(trace.createdAt)
202+
const apiEndpoint = await queryRunner.manager.findOne(ApiEndpoint, {
203+
where: {
204+
pathRegex: Raw(alias => `:path ~ ${alias}`, { path: trace.path }),
205+
method: trace.method,
206+
host: trace.host,
207+
},
208+
relations: { openapiSpec: true, dataFields: true },
209+
order: {
210+
numberParams: "ASC",
211+
},
212+
})
213+
if (apiEndpoint && !skipAutoGeneratedMatch(apiEndpoint, trace.path)) {
214+
await analyze(trace, apiEndpoint, queryRunner)
215+
} else {
216+
await generateEndpoint(trace, queryRunner)
217+
}
218+
}
219+
} catch (err) {
220+
console.error(`Encountered error while analyzing traces: ${err}`)
221+
if (queryRunner.isTransactionActive) {
222+
await queryRunner.rollbackTransaction()
223+
}
224+
} finally {
225+
if (queryRunner.isReleased) {
226+
queryRunner = AppDataSource.createQueryRunner()
227+
await queryRunner.connect()
228+
}
229+
}
230+
}
231+
}
232+
233+
export default analyzeTraces

backend/src/analyzer.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import path from "path"
2+
import Piscina from "piscina"
3+
4+
const main = async () => {
5+
const pool = new Piscina()
6+
const options = {
7+
filename: path.resolve(__dirname, "analyze-traces.js"),
8+
}
9+
const analyzers = Array.from({ length: parseInt(process.env.NUM_WORKERS || "1") }).map(() =>
10+
pool.run({}, options),
11+
)
12+
await Promise.all(analyzers)
13+
}
14+
main()

0 commit comments

Comments
 (0)