Skip to content

Pilot logging #511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions diracx-core/src/diracx/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,14 @@ class JobCommand(BaseModel):
job_id: int
command: Literal["Kill"]
arguments: str | None = None


class LogLine(BaseModel):
line_no: int
line: str


class LogMessage(BaseModel):
pilot_stamp: str
lines: list[LogLine]
vo: str
1 change: 1 addition & 0 deletions diracx-db/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ TaskQueueDB = "diracx.db.sql:TaskQueueDB"

[project.entry-points."diracx.dbs.os"]
JobParametersDB = "diracx.db.os:JobParametersDB"
PilotLogsDB = "diracx.db.os:PilotLogsDB"

[tool.setuptools.packages.find]
where = ["src"]
Expand Down
6 changes: 5 additions & 1 deletion diracx-db/src/diracx/db/os/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from __future__ import annotations

__all__ = ("JobParametersDB",)
__all__ = (
"JobParametersDB",
"PilotLogsDB",
)

from .job_parameters import JobParametersDB
from .pilot_logs import PilotLogsDB
30 changes: 30 additions & 0 deletions diracx-db/src/diracx/db/os/pilot_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

from diracx.db.os.utils import BaseOSDB


class PilotLogsDB(BaseOSDB):
fields = {
"PilotStamp": {"type": "keyword"},
"PilotID": {"type": "long"},
"SubmissionTime": {"type": "date"},
"LineNumber": {"type": "long"},
"Message": {"type": "text"},
"VO": {"type": "keyword"},
"timestamp": {"type": "date"},
}
index_prefix = "pilot_logs"

def index_name(self, doc_id: int) -> str:
# TODO decide how to define the index name
# use pilot ID
return f"{self.index_prefix}_{doc_id // 1e6:.0f}"


async def search_message(db: PilotLogsDB, search_params: list[dict]):

return await db.search(
["Message"],
search_params,
[{"parameter": "LineNumber", "direction": "asc"}],
)
19 changes: 19 additions & 0 deletions diracx-db/src/diracx/db/os/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Any, Self

from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_bulk

from diracx.core.exceptions import InvalidQueryError
from diracx.core.extensions import select_from_extension
Expand Down Expand Up @@ -197,6 +198,13 @@ async def upsert(self, vo: str, doc_id: int, document: Any) -> None:
response,
)

async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None:
"""Bulk inserting to database."""
n_inserted = await async_bulk(
self.client, actions=[doc | {"_index": index_name} for doc in docs]
)
logger.info("Inserted %d documents to %r", n_inserted, index_name)

async def search(
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -238,6 +246,17 @@ async def search(

return hits

async def delete(self, query: list[dict[str, Any]]) -> dict:
"""Delete multiple documents by query."""
body = {}
res = {}
if query:
body["query"] = apply_search_filters(self.fields, query)
res = await self.client.delete_by_query(
body=body, index=f"{self.index_prefix}*"
)
return res


def require_type(operator, field_name, field_type, allowed_types):
if field_type not in allowed_types:
Expand Down
19 changes: 4 additions & 15 deletions diracx-db/src/diracx/db/sql/job/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from diracx.core.exceptions import InvalidQueryError
from diracx.core.models import JobCommand, SearchSpec, SortSpec

from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints
from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints, get_columns
from ..utils.functions import utcnow
from .schema import (
HeartBeatLoggingInfo,
Expand All @@ -25,17 +25,6 @@
)


def _get_columns(table, parameters):
columns = [x for x in table.columns]
if parameters:
if unrecognised_parameters := set(parameters) - set(table.columns.keys()):
raise InvalidQueryError(
f"Unrecognised parameters requested {unrecognised_parameters}"
)
columns = [c for c in columns if c.name in parameters]
return columns


class JobDB(BaseSQLDB):
metadata = JobDBBase.metadata

Expand All @@ -56,7 +45,7 @@ class JobDB(BaseSQLDB):

async def summary(self, group_by, search) -> list[dict[str, str | int]]:
"""Get a summary of the jobs."""
columns = _get_columns(Jobs.__table__, group_by)
columns = get_columns(Jobs.__table__, group_by)

stmt = select(*columns, func.count(Jobs.job_id).label("count"))
stmt = apply_search_filters(Jobs.__table__.columns.__getitem__, stmt, search)
Expand All @@ -81,7 +70,7 @@ async def search(
) -> tuple[int, list[dict[Any, Any]]]:
"""Search for jobs in the database."""
# Find which columns to select
columns = _get_columns(Jobs.__table__, parameters)
columns = get_columns(Jobs.__table__, parameters)

stmt = select(*columns)

Expand Down Expand Up @@ -267,7 +256,7 @@ async def set_properties(
required_parameters = list(required_parameters_set)[0]
update_parameters = [{"job_id": k, **v} for k, v in properties.items()]

columns = _get_columns(Jobs.__table__, required_parameters)
columns = get_columns(Jobs.__table__, required_parameters)
values: dict[str, BindParameter[Any] | datetime] = {
c.name: bindparam(c.name) for c in columns
}
Expand Down
54 changes: 52 additions & 2 deletions diracx-db/src/diracx/db/sql/pilot_agents/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import Any

from sqlalchemy import insert
from sqlalchemy import func, insert, select

from ..utils import BaseSQLDB
from diracx.core.exceptions import InvalidQueryError
from diracx.core.models import (
SearchSpec,
SortSpec,
)

from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints, get_columns
from .schema import PilotAgents, PilotAgentsDBBase


Expand Down Expand Up @@ -44,3 +51,46 @@ async def add_pilot_references(
stmt = insert(PilotAgents).values(values)
await self.conn.execute(stmt)
return

async def search(
self,
parameters: list[str] | None,
search: list[SearchSpec],
sorts: list[SortSpec],
*,
distinct: bool = False,
per_page: int = 100,
page: int | None = None,
) -> tuple[int, list[dict[Any, Any]]]:
# Find which columns to select
columns = get_columns(PilotAgents.__table__, parameters)

stmt = select(*columns)

stmt = apply_search_filters(
PilotAgents.__table__.columns.__getitem__, stmt, search
)
stmt = apply_sort_constraints(
PilotAgents.__table__.columns.__getitem__, stmt, sorts
)

if distinct:
stmt = stmt.distinct()

# Calculate total count before applying pagination
total_count_subquery = stmt.alias()
total_count_stmt = select(func.count()).select_from(total_count_subquery)
total = (await self.conn.execute(total_count_stmt)).scalar_one()

# Apply pagination
if page is not None:
if page < 1:
raise InvalidQueryError("Page must be a positive integer")
if per_page < 1:
raise InvalidQueryError("Per page must be a positive integer")
stmt = stmt.offset((page - 1) * per_page).limit(per_page)

# Execute the query
return total, [
dict(row._mapping) async for row in (await self.conn.stream(stmt))
]
2 changes: 2 additions & 0 deletions diracx-db/src/diracx/db/sql/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
SQLDBUnavailableError,
apply_search_filters,
apply_sort_constraints,
get_columns,
)
from .functions import hash, substract_date, utcnow
from .types import Column, DateNowColumn, EnumBackedBool, EnumColumn, NullColumn
Expand All @@ -19,6 +20,7 @@
"EnumColumn",
"apply_search_filters",
"apply_sort_constraints",
"get_columns",
"substract_date",
"hash",
"SQLDBUnavailableError",
Expand Down
11 changes: 11 additions & 0 deletions diracx-db/src/diracx/db/sql/utils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,17 @@ def find_time_resolution(value):
raise InvalidQueryError(f"Cannot parse {value=}")


def get_columns(table, parameters):
columns = [x for x in table.columns]
if parameters:
if unrecognised_parameters := set(parameters) - set(table.columns.keys()):
raise InvalidQueryError(
f"Unrecognised parameters requested {unrecognised_parameters}"
)
columns = [c for c in columns if c.name in parameters]
return columns


def apply_search_filters(column_mapping, stmt, search):
for query in search:
try:
Expand Down
4 changes: 2 additions & 2 deletions diracx-logic/src/diracx/logic/jobs/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
VectorSearchSpec,
)
from diracx.db.os.job_parameters import JobParametersDB
from diracx.db.sql.job.db import JobDB, _get_columns
from diracx.db.sql.job.db import JobDB, get_columns
from diracx.db.sql.job.schema import Jobs
from diracx.db.sql.job_logging.db import JobLoggingDB
from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB
Expand Down Expand Up @@ -508,7 +508,7 @@ async def set_job_parameters_or_attributes(
):
"""Set job parameters or attributes for a list of jobs."""
attribute_columns: list[str] = [
col.name for col in _get_columns(Jobs.__table__, None)
col.name for col in get_columns(Jobs.__table__, None)
]
attribute_columns_lower: list[str] = [col.lower() for col in attribute_columns]

Expand Down
Empty file.
69 changes: 69 additions & 0 deletions diracx-logic/src/diracx/logic/pilots/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

import logging

from diracx.core.models import LogMessage, ScalarSearchOperator, ScalarSearchSpec
from diracx.db.os.pilot_logs import PilotLogsDB, search_message
from diracx.db.sql.pilot_agents.db import PilotAgentsDB

logger = logging.getLogger(__name__)


async def send_message(
data: LogMessage,
pilot_logs_db: PilotLogsDB,
pilot_agents_db: PilotAgentsDB,
) -> int:

# get the pilot ID corresponding to a given pilot stamp, expecting exactly one row:
search_params = ScalarSearchSpec(
parameter="PilotStamp",
operator=ScalarSearchOperator.EQUAL,
value=data.pilot_stamp,
)

total, result = await pilot_agents_db.search(
["PilotID", "VO", "SubmissionTime"], [search_params], []
)
if total != 1:
logger.error(
"Cannot determine PilotID for requested PilotStamp: %r, (%d candidates)",
data.pilot_stamp,
total,
)
raise Exception(f"Number of rows !=1 {total}")

pilot_id, vo, submission_time = (
result[0]["PilotID"],
result[0]["VO"],
result[0]["SubmissionTime"],
)
docs = []
for line in data.lines:
docs.append(
{
"PilotStamp": data.pilot_stamp,
"PilotID": pilot_id,
"SubmissionTime": submission_time,
"VO": vo,
"LineNumber": line.line_no,
"Message": line.line,
}
)
# bulk insert pilot logs to OpenSearch DB:
await pilot_logs_db.bulk_insert(pilot_logs_db.index_name(pilot_id), docs)
return pilot_id


async def get_logs(
pilot_id: int,
db: PilotLogsDB,
) -> list[dict]:

search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}]

result = await search_message(db, search_params)

if not result:
return [{"Message": f"No logs for pilot ID = {pilot_id}"}]
return result
2 changes: 2 additions & 0 deletions diracx-routers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ auth = "diracx.routers.auth:router"
config = "diracx.routers.configuration:router"
health = "diracx.routers.health:router"
jobs = "diracx.routers.jobs:router"
pilots = "diracx.routers.pilots:router"

[project.entry-points."diracx.access_policies"]
WMSAccessPolicy = "diracx.routers.jobs.access_policies:WMSAccessPolicy"
SandboxAccessPolicy = "diracx.routers.jobs.access_policies:SandboxAccessPolicy"
PilotLogsAccessPolicy = "diracx.routers.pilots.access_policies:PilotLogsAccessPolicy"

# Minimum version of the client supported
[project.entry-points."diracx.min_client_version"]
Expand Down
8 changes: 5 additions & 3 deletions diracx-routers/src/diracx/routers/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"SandboxMetadataDB",
"TaskQueueDB",
"PilotAgentsDB",
"PilotLogsDB",
"add_settings_annotation",
"AvailableSecurityProperties",
)
Expand All @@ -23,6 +24,7 @@
from diracx.core.settings import DevelopmentSettings as _DevelopmentSettings
from diracx.core.settings import SandboxStoreSettings as _SandboxStoreSettings
from diracx.db.os import JobParametersDB as _JobParametersDB
from diracx.db.os import PilotLogsDB as _PilotLogsDB
from diracx.db.sql import AuthDB as _AuthDB
from diracx.db.sql import JobDB as _JobDB
from diracx.db.sql import JobLoggingDB as _JobLoggingDB
Expand All @@ -38,7 +40,7 @@ def add_settings_annotation(cls: T) -> T:
return Annotated[cls, Depends(cls.create)] # type: ignore


# Databases
# SQL Databases
AuthDB = Annotated[_AuthDB, Depends(_AuthDB.transaction)]
JobDB = Annotated[_JobDB, Depends(_JobDB.transaction)]
JobLoggingDB = Annotated[_JobLoggingDB, Depends(_JobLoggingDB.transaction)]
Expand All @@ -48,9 +50,9 @@ def add_settings_annotation(cls: T) -> T:
]
TaskQueueDB = Annotated[_TaskQueueDB, Depends(_TaskQueueDB.transaction)]

# Opensearch databases
# OpenSearch Databases
JobParametersDB = Annotated[_JobParametersDB, Depends(_JobParametersDB.session)]

PilotLogsDB = Annotated[_PilotLogsDB, Depends(_PilotLogsDB.session)]

# Miscellaneous
Config = Annotated[_Config, Depends(ConfigSource.create)]
Expand Down
Loading
Loading