Skip to content

Commit 573052e

Browse files
feat: Add pilot logging
1 parent d5f4676 commit 573052e

File tree

14 files changed

+662
-12
lines changed

14 files changed

+662
-12
lines changed

diracx-core/src/diracx/core/models.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,15 @@ class PilotRefreshTokenPayload(BaseTokenPayload):
352352

353353
class PilotCredentialsInfo(PilotSecretsInfo):
354354
pilot_stamp: str
355+
356+
357+
class LogLine(BaseModel):
358+
timestamp: str
359+
severity: str
360+
message: str
361+
scope: str
362+
363+
364+
class LogMessage(BaseModel):
365+
pilot_stamp: str
366+
lines: list[LogLine]

diracx-db/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ SandboxMetadataDB = "diracx.db.sql:SandboxMetadataDB"
3333
TaskQueueDB = "diracx.db.sql:TaskQueueDB"
3434

3535
[project.entry-points."diracx.dbs.os"]
36+
PilotLogsDB = "diracx.db.os:PilotLogsDB"
3637
JobParametersDB = "diracx.db.os:JobParametersDB"
3738

3839
[build-system]
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3-
__all__ = ("JobParametersDB",)
3+
__all__ = ("JobParametersDB", "PilotLogsDB")
44

55
from .job_parameters import JobParametersDB
6+
from .pilot_logs import PilotLogsDB
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from __future__ import annotations
2+
3+
from diracx.db.os.utils import BaseOSDB
4+
5+
6+
class PilotLogsDB(BaseOSDB):
7+
fields = {
8+
"PilotStamp": {"type": "keyword"},
9+
"PilotID": {"type": "long"},
10+
"Severity": {"type": "keyword"},
11+
"Message": {"type": "text"},
12+
"VO": {"type": "keyword"},
13+
"TimeStamp": {"type": "date_nanos"},
14+
"Scope": {"type": "keyword"},
15+
}
16+
index_prefix = "pilot_logs"
17+
18+
def index_name(self, vo: str, doc_id: int) -> str:
19+
# TODO decide how to define the index name
20+
# use pilot ID
21+
return f"{self.index_prefix}_{doc_id}"

diracx-db/src/diracx/db/os/utils.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
from opensearchpy.helpers import async_bulk
4+
35
__all__ = ("BaseOSDB",)
46

57
import contextlib
@@ -197,13 +199,35 @@ async def upsert(self, vo: str, doc_id: int, document: Any) -> None:
197199
response,
198200
)
199201

202+
async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None:
203+
"""Bulk inserting to database."""
204+
n_inserted, failed = await async_bulk(
205+
self.client, actions=[doc | {"_index": index_name} for doc in docs]
206+
)
207+
logger.info("Inserted %d documents to %s", n_inserted, index_name)
208+
209+
if failed:
210+
logger.error("Fail to insert %d documents to %s", failed, index_name)
211+
200212
async def search(
201-
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
202-
) -> list[dict[str, Any]]:
213+
self,
214+
parameters,
215+
search,
216+
sorts,
217+
*,
218+
per_page: int = 10000,
219+
page: int | None = None,
220+
) -> tuple[int, list[dict[str, Any]]]:
203221
"""Search the database for matching results.
204222
205223
See the DiracX search API documentation for details.
206224
"""
225+
if page:
226+
if page < 1:
227+
raise InvalidQueryError("Page must be a positive integer")
228+
if per_page < 1:
229+
raise InvalidQueryError("Per page must be a positive integer")
230+
207231
body = {}
208232
if parameters:
209233
body["_source"] = parameters
@@ -213,7 +237,12 @@ async def search(
213237
for sort in sorts:
214238
field_name = sort["parameter"]
215239
field_type = self.fields.get(field_name, {}).get("type")
216-
require_type("sort", field_name, field_type, {"keyword", "long", "date"})
240+
require_type(
241+
"sort",
242+
field_name,
243+
field_type,
244+
{"keyword", "long", "date", "date_nanos"},
245+
)
217246
body["sort"].append({field_name: {"order": sort["direction"]}})
218247

219248
params = {}
@@ -226,17 +255,19 @@ async def search(
226255
)
227256
hits = [hit["_source"] for hit in response["hits"]["hits"]]
228257

258+
total_hits = response["hits"]["total"]["value"]
259+
229260
# Dates are returned as strings, convert them to Python datetimes
230261
for hit in hits:
231262
for field_name in hit:
232263
if field_name not in self.fields:
233264
continue
234-
if self.fields[field_name]["type"] == "date":
265+
if self.fields[field_name]["type"] in ["date", "date_nanos"]:
235266
hit[field_name] = datetime.strptime(
236267
hit[field_name], "%Y-%m-%dT%H:%M:%S.%f%z"
237268
)
238269

239-
return hits
270+
return total_hits, hits
240271

241272

242273
def require_type(operator, field_name, field_type, allowed_types):

diracx-logic/src/diracx/logic/pilots/query.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
ScalarSearchOperator,
88
ScalarSearchSpec,
99
SearchParams,
10+
SortDirection,
1011
VectorSearchOperator,
1112
VectorSearchSpec,
1213
)
14+
from diracx.db.os.pilot_logs import PilotLogsDB
1315
from diracx.db.sql import PilotAgentsDB
1416

1517
MAX_PER_PAGE = 10000
@@ -176,3 +178,40 @@ async def get_secrets_by_uuid_bulk(
176178
)
177179

178180
return secrets
181+
182+
183+
async def search_logs(
184+
vo: str,
185+
body: SearchParams | None,
186+
per_page: int,
187+
page: int,
188+
pilot_logs_db: PilotLogsDB,
189+
) -> tuple[int, list[dict]]:
190+
"""Retrieve logs from OpenSearch for a given PilotStamp."""
191+
# Apply a limit to per_page to prevent abuse of the API
192+
if per_page > MAX_PER_PAGE:
193+
per_page = MAX_PER_PAGE
194+
195+
if body is None:
196+
body = SearchParams()
197+
198+
search = body.search
199+
parameters = body.parameters
200+
sorts = body.sort
201+
202+
# Add the vo to make sure that we filter for pilots we can see
203+
# TODO: Test it
204+
search = search + [
205+
{
206+
"parameter": "VO",
207+
"operator": "eq",
208+
"value": vo,
209+
}
210+
]
211+
212+
if not sorts:
213+
sorts = [{"parameter": "TimeStamp", "direction": SortDirection("asc")}]
214+
215+
return await pilot_logs_db.search(
216+
parameters=parameters, search=search, sorts=sorts, per_page=per_page, page=page
217+
)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""File dedicated to logic for pilot only resources (logs, jobs, etc.)."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
7+
from diracx.core.exceptions import PilotNotFoundError
8+
from diracx.core.models import (
9+
LogMessage,
10+
)
11+
from diracx.db.os.pilot_logs import PilotLogsDB
12+
from diracx.db.sql.pilots.db import PilotAgentsDB
13+
14+
from .query import get_pilot_ids_by_stamps
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
MAX_PER_PAGE = 10000
20+
21+
22+
async def send_message(
23+
data: LogMessage,
24+
pilot_logs_db: PilotLogsDB,
25+
pilot_db: PilotAgentsDB,
26+
vo: str,
27+
):
28+
try:
29+
pilot_ids = await get_pilot_ids_by_stamps(
30+
pilot_db=pilot_db, pilot_stamps=[data.pilot_stamp]
31+
)
32+
pilot_id = pilot_ids[0] # Semantic
33+
except PilotNotFoundError:
34+
# If a pilot is not found, then we still store the data (to not lost it)
35+
# We log it as it's not supposed to happen
36+
pilot_id = -1 # To detect
37+
38+
docs = []
39+
for line in data.lines:
40+
docs.append(
41+
{
42+
"PilotStamp": data.pilot_stamp,
43+
"PilotID": pilot_id,
44+
"VO": vo,
45+
"Severity": line.severity,
46+
"Message": line.message,
47+
"TimeStamp": line.timestamp,
48+
"Scope": line.scope,
49+
}
50+
)
51+
# bulk insert pilot logs to OpenSearch DB:
52+
await pilot_logs_db.bulk_insert(pilot_logs_db.index_name(vo, pilot_id), docs)

diracx-routers/src/diracx/routers/dependencies.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from diracx.core.settings import DevelopmentSettings as _DevelopmentSettings
2424
from diracx.core.settings import SandboxStoreSettings as _SandboxStoreSettings
2525
from diracx.db.os import JobParametersDB as _JobParametersDB
26+
from diracx.db.os import PilotLogsDB as _PilotLogsDB
2627
from diracx.db.sql import AuthDB as _AuthDB
2728
from diracx.db.sql import JobDB as _JobDB
2829
from diracx.db.sql import JobLoggingDB as _JobLoggingDB
@@ -50,7 +51,7 @@ def add_settings_annotation(cls: T) -> T:
5051

5152
# Opensearch databases
5253
JobParametersDB = Annotated[_JobParametersDB, Depends(_JobParametersDB.session)]
53-
54+
PilotLogsDB = Annotated[_PilotLogsDB, Depends(_PilotLogsDB.session)]
5455

5556
# Miscellaneous
5657
Config = Annotated[_Config, Depends(ConfigSource.create)]

diracx-routers/src/diracx/routers/pilots/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
from .auth import router as auth_router
77
from .management import router as management_router
88
from .query import router as query_router
9+
from .resources import router as resources_router
910

1011
logger = logging.getLogger(__name__)
1112

1213
router = DiracxRouter(require_auth=False)
1314
router.include_router(management_router)
1415
router.include_router(query_router)
1516
router.include_router(auth_router)
17+
router.include_router(resources_router)

diracx-routers/src/diracx/routers/pilots/access_policies.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ class ActionType(StrEnum):
2323
CHANGE_PILOT_FIELD = auto()
2424
# Read some pilot info
2525
READ_PILOT_FIELDS = auto()
26+
# Read pilot logs
27+
READ_PILOT_LOGS = auto()
2628

2729

2830
class PilotManagementAccessPolicy(BaseAccessPolicy):
@@ -44,7 +46,7 @@ async def policy(
4446
):
4547
assert action, "action is a mandatory parameter"
4648

47-
if action == ActionType.READ_PILOT_FIELDS:
49+
if action in {ActionType.READ_PILOT_FIELDS, ActionType.READ_PILOT_LOGS}:
4850
if NORMAL_USER in user_info.properties:
4951
return
5052

0 commit comments

Comments
 (0)