Skip to content

Commit 5218531

Browse files
feat: Logging is working, and we can search through the data
1 parent 53bf1a1 commit 5218531

File tree

8 files changed

+184
-137
lines changed

8 files changed

+184
-137
lines changed

diracx-core/src/diracx/core/models.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,12 @@ class JobCommand(BaseModel):
324324

325325

326326
class LogLine(BaseModel):
327-
line_no: int
328-
line: str
327+
timestamp: str
328+
severity: str
329+
message: str
330+
scope: str
329331

330332

331333
class LogMessage(BaseModel):
332334
pilot_stamp: str
333335
lines: list[LogLine]
334-
vo: str

diracx-db/src/diracx/db/os/pilot_logs.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,15 @@ class PilotLogsDB(BaseOSDB):
77
fields = {
88
"PilotStamp": {"type": "keyword"},
99
"PilotID": {"type": "long"},
10-
"SubmissionTime": {"type": "date"},
11-
"LineNumber": {"type": "long"},
10+
"Severity": {"type": "keyword"},
1211
"Message": {"type": "text"},
1312
"VO": {"type": "keyword"},
14-
"timestamp": {"type": "date"},
13+
"TimeStamp": {"type": "date_nanos"},
14+
"Scope": {"type": "keyword"},
1515
}
1616
index_prefix = "pilot_logs"
1717

1818
def index_name(self, vo: str, doc_id: int) -> str:
1919
# TODO decide how to define the index name
2020
# use pilot ID
21-
return f"{self.index_prefix}_{doc_id // 1e6:.0f}"
22-
23-
24-
async def search_message(db: PilotLogsDB, search_params: list[dict]):
25-
26-
return await db.search(
27-
["Message"],
28-
search_params,
29-
[{"parameter": "LineNumber", "direction": "asc"}],
30-
)
21+
return f"{self.index_prefix}_{doc_id}"

diracx-db/src/diracx/db/os/utils.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,22 @@ async def upsert(self, vo: str, doc_id: int, document: Any) -> None:
200200

201201
async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None:
202202
"""Bulk inserting to database."""
203-
n_inserted = await async_bulk(
203+
n_inserted, failed = await async_bulk(
204204
self.client, actions=[doc | {"_index": index_name} for doc in docs]
205205
)
206-
logger.info("Inserted %d documents to %r", n_inserted, index_name)
206+
logger.info("Inserted %d documents to %s", n_inserted, index_name)
207+
208+
if failed:
209+
logger.error("Fail to insert %d documents to %s", failed, index_name)
207210

208211
async def search(
209-
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
212+
self,
213+
parameters,
214+
search,
215+
sorts,
216+
*,
217+
per_page: int = 10000,
218+
page: int | None = None,
210219
) -> list[dict[str, Any]]:
211220
"""Search the database for matching results.
212221
@@ -221,7 +230,12 @@ async def search(
221230
for sort in sorts:
222231
field_name = sort["parameter"]
223232
field_type = self.fields.get(field_name, {}).get("type")
224-
require_type("sort", field_name, field_type, {"keyword", "long", "date"})
233+
require_type(
234+
"sort",
235+
field_name,
236+
field_type,
237+
{"keyword", "long", "date", "date_nanos"},
238+
)
225239
body["sort"].append({field_name: {"order": sort["direction"]}})
226240

227241
params = {}
@@ -239,7 +253,7 @@ async def search(
239253
for field_name in hit:
240254
if field_name not in self.fields:
241255
continue
242-
if self.fields[field_name]["type"] == "date":
256+
if self.fields[field_name]["type"] in ["date", "date_nanos"]:
243257
hit[field_name] = datetime.strptime(
244258
hit[field_name], "%Y-%m-%dT%H:%M:%S.%f%z"
245259
)
@@ -286,30 +300,46 @@ def apply_search_filters(db_fields, search):
286300
match operator := query["operator"]:
287301
case "eq":
288302
require_type(
289-
operator, field_name, field_type, {"keyword", "long", "date"}
303+
operator,
304+
field_name,
305+
field_type,
306+
{"keyword", "long", "date", "date_nanos"},
290307
)
291308
result["must"].append({"term": {field_name: {"value": query["value"]}}})
292309
case "neq":
293310
require_type(
294-
operator, field_name, field_type, {"keyword", "long", "date"}
311+
operator,
312+
field_name,
313+
field_type,
314+
{"keyword", "long", "date", "date_nanos"},
295315
)
296316
result["must_not"].append(
297317
{"term": {field_name: {"value": query["value"]}}}
298318
)
299319
case "gt":
300-
require_type(operator, field_name, field_type, {"long", "date"})
320+
require_type(
321+
operator, field_name, field_type, {"long", "date", "date_nanos"}
322+
)
301323
result["must"].append({"range": {field_name: {"gt": query["value"]}}})
302324
case "lt":
303-
require_type(operator, field_name, field_type, {"long", "date"})
325+
require_type(
326+
operator, field_name, field_type, {"long", "date", "date_nanos"}
327+
)
304328
result["must"].append({"range": {field_name: {"lt": query["value"]}}})
305329
case "in":
306330
require_type(
307-
operator, field_name, field_type, {"keyword", "long", "date"}
331+
operator,
332+
field_name,
333+
field_type,
334+
{"keyword", "long", "date", "date_nanos"},
308335
)
309336
result["must"].append({"terms": {field_name: query["values"]}})
310337
case "not in":
311338
require_type(
312-
operator, field_name, field_type, {"keyword", "long", "date"}
339+
operator,
340+
field_name,
341+
field_type,
342+
{"keyword", "long", "date", "date_nanos"},
313343
)
314344
result["must_not"].append({"terms": {field_name: query["values"]}})
315345
# TODO: Implement like and ilike

diracx-logic/src/diracx/logic/pilots/logging.py

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,68 +2,84 @@
22

33
import logging
44

5-
from diracx.core.models import LogMessage, ScalarSearchOperator, ScalarSearchSpec
6-
from diracx.db.os.pilot_logs import PilotLogsDB, search_message
5+
from diracx.core.exceptions import PilotNotFoundError
6+
from diracx.core.models import (
7+
LogMessage,
8+
SearchParams,
9+
SortDirection,
10+
)
11+
from diracx.db.os.pilot_logs import PilotLogsDB
712
from diracx.db.sql.pilot_agents.db import PilotAgentsDB
813

914
logger = logging.getLogger(__name__)
1015

1116

17+
MAX_PER_PAGE = 10000
18+
19+
1220
async def send_message(
1321
data: LogMessage,
1422
pilot_logs_db: PilotLogsDB,
1523
pilot_agents_db: PilotAgentsDB,
16-
) -> int:
17-
18-
# get the pilot ID corresponding to a given pilot stamp, expecting exactly one row:
19-
search_params = ScalarSearchSpec(
20-
parameter="PilotStamp",
21-
operator=ScalarSearchOperator.EQUAL,
22-
value=data.pilot_stamp,
23-
)
24-
25-
total, result = await pilot_agents_db.search(
26-
["PilotID", "VO", "SubmissionTime"], [search_params], []
27-
)
28-
if total != 1:
29-
logger.error(
30-
"Cannot determine PilotID for requested PilotStamp: %r, (%d candidates)",
31-
data.pilot_stamp,
32-
total,
33-
)
34-
raise Exception(f"Number of rows !=1 {total}")
24+
vo: str,
25+
):
26+
try:
27+
pilot_ids = await pilot_agents_db.get_pilot_ids_by_stamps([data.pilot_stamp])
28+
pilot_id = pilot_ids[0] # Semantic
29+
except PilotNotFoundError:
30+
# If a pilot is not found, then we still store the data (to not lost it)
31+
# We log it as it's not supposed to happen
32+
pilot_id = -1 # To detect
3533

36-
pilot_id, vo, submission_time = (
37-
result[0]["PilotID"],
38-
result[0]["VO"],
39-
result[0]["SubmissionTime"],
40-
)
4134
docs = []
4235
for line in data.lines:
4336
docs.append(
4437
{
4538
"PilotStamp": data.pilot_stamp,
4639
"PilotID": pilot_id,
47-
"SubmissionTime": submission_time,
4840
"VO": vo,
49-
"LineNumber": line.line_no,
50-
"Message": line.line,
41+
"Severity": line.severity,
42+
"Message": line.message,
43+
"TimeStamp": line.timestamp,
44+
"Scope": line.scope,
5145
}
5246
)
5347
# bulk insert pilot logs to OpenSearch DB:
5448
await pilot_logs_db.bulk_insert(pilot_logs_db.index_name(vo, pilot_id), docs)
55-
return pilot_id
5649

5750

58-
async def get_logs(
59-
pilot_id: int,
60-
db: PilotLogsDB,
51+
async def search_logs(
52+
vo: str,
53+
body: SearchParams | None,
54+
per_page: int,
55+
page: int,
56+
pilot_logs_db: PilotLogsDB,
6157
) -> list[dict]:
58+
"""Retrieve logs from OpenSearch for a given PilotStamp."""
59+
# Apply a limit to per_page to prevent abuse of the API
60+
if per_page > MAX_PER_PAGE:
61+
per_page = MAX_PER_PAGE
6262

63-
search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}]
63+
if body is None:
64+
body = SearchParams()
6465

65-
result = await search_message(db, search_params)
66+
search = body.search
67+
parameters = body.parameters
68+
sorts = body.sort
6669

67-
if not result:
68-
return [{"Message": f"No logs for pilot ID = {pilot_id}"}]
69-
return result
70+
# Add the vo to make sure that we filter for pilots we can see
71+
# TODO: Test it
72+
search = search + [
73+
{
74+
"parameter": "VO",
75+
"operator": "eq",
76+
"value": vo,
77+
}
78+
]
79+
80+
if not sorts:
81+
sorts = [{"parameter": "TimeStamp", "direction": SortDirection("asc")}]
82+
83+
return await pilot_logs_db.search(
84+
parameters=parameters, search=search, sorts=sorts, per_page=per_page, page=page
85+
)

diracx-routers/src/diracx/routers/pilots/access_policies.py

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
from fastapi import Depends, HTTPException, status
99

1010
from diracx.core.exceptions import PilotNotFoundError
11-
from diracx.core.models import ScalarSearchOperator, ScalarSearchSpec
12-
from diracx.core.properties import NORMAL_USER, TRUSTED_HOST
11+
from diracx.core.properties import (
12+
GENERIC_PILOT,
13+
LIMITED_DELEGATION,
14+
NORMAL_USER,
15+
TRUSTED_HOST,
16+
)
1317
from diracx.db.sql import PilotAgentsDB
1418
from diracx.routers.access_policies import BaseAccessPolicy
1519
from diracx.routers.utils.users import AuthorizedUserInfo
@@ -25,9 +29,9 @@ class ActionType(StrEnum):
2529
# Read some pilot info
2630
READ_PILOT_FIELDS = auto()
2731
#: Create/update pilot log records
28-
CREATE = auto()
32+
CREATE_LOG = auto()
2933
#: Search
30-
QUERY = auto()
34+
QUERY_LOGS = auto()
3135

3236

3337
class PilotManagementAccessPolicy(BaseAccessPolicy):
@@ -49,7 +53,6 @@ async def policy(
4953
action: ActionType | None = None,
5054
):
5155
assert action, "action is a mandatory parameter"
52-
5356
if action == ActionType.READ_PILOT_FIELDS:
5457
if NORMAL_USER in user_info.properties:
5558
return
@@ -132,60 +135,47 @@ async def policy(policy_name: str, user_info: AuthorizedUserInfo):
132135

133136
class PilotLogsAccessPolicy(BaseAccessPolicy):
134137
"""Rules:
135-
Only NORMAL_USER in a correct VO and a diracAdmin VO member can query log records.
138+
Only NORMAL_USER in a correct VO can query log records
136139
All other actions and users are explicitly denied access.
137140
"""
138141

142+
# TODO: Better doc
143+
139144
@staticmethod
140145
async def policy(
141146
policy_name: str,
142147
user_info: AuthorizedUserInfo,
143148
/,
144149
*,
145150
action: ActionType | None = None,
146-
pilot_agents_db: PilotAgentsDB | None = None,
147-
pilot_id: int | None = None,
151+
pilot_db: PilotAgentsDB | None = None,
148152
):
149-
assert pilot_agents_db
150-
if action is None:
151-
raise HTTPException(
152-
status.HTTP_400_BAD_REQUEST, detail="Action is a mandatory argument"
153-
)
154-
elif action == ActionType.QUERY:
155-
if pilot_id is None:
156-
logger.error("Pilot ID value is not provided (None)")
157-
raise HTTPException(
158-
status.HTTP_400_BAD_REQUEST,
159-
detail=f"PilotID not provided: {pilot_id}",
160-
)
161-
search_params = ScalarSearchSpec(
162-
parameter="PilotID",
163-
operator=ScalarSearchOperator.EQUAL,
164-
value=pilot_id,
165-
)
166153

167-
total, result = await pilot_agents_db.search(["VO"], [search_params], [])
168-
# we expect exactly one row.
169-
if total != 1:
170-
logger.error(
171-
"Cannot determine VO for requested PilotID: %d, found %d candidates.",
172-
pilot_id,
173-
total,
174-
)
175-
raise HTTPException(
176-
status.HTTP_400_BAD_REQUEST, detail=f"PilotID not found: {pilot_id}"
177-
)
178-
vo = result[0]["VO"]
154+
assert pilot_db
155+
if action == ActionType.CREATE_LOG:
156+
pilot_info = user_info # semantic
179157

180-
if user_info.vo == "diracAdmin":
158+
if GENERIC_PILOT in pilot_info.properties:
181159
return
182160

183-
if NORMAL_USER in user_info.properties and user_info.vo == vo:
161+
if LIMITED_DELEGATION in pilot_info.properties:
162+
return
163+
164+
raise HTTPException(
165+
status_code=status.HTTP_403_FORBIDDEN,
166+
detail="You must be a pilot to create logs.",
167+
)
168+
169+
elif action == ActionType.QUERY_LOGS:
170+
# TODO: See if we move this elsewhere to separate from pilots
171+
# TODO: To see if we can access a VO, we add a new rule a the end in the logic
172+
# -> Do we add it here? (another verification)
173+
if NORMAL_USER in user_info.properties:
184174
return
185175

186176
raise HTTPException(
187177
status.HTTP_403_FORBIDDEN,
188-
detail="You don't have permission to access this pilot's log.",
178+
detail="You don't have permission to access pilots log",
189179
)
190180
else:
191181
raise NotImplementedError(action)

0 commit comments

Comments
 (0)