Skip to content

[9.0] feat: use summary tables and triggers #8199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5a8328d
feat: use summary table and triggers (PilotAgentsDB.PilotAgents, summ…
fstagni May 19, 2025
7ffe0c3
feat: added VO to PilotsHistory Monitoring
fstagni May 19, 2025
8a62c6c
fix: added exec_mysql_root command
fstagni May 20, 2025
6646981
feat: handling DELIMETER in sql files
fstagni May 20, 2025
413ace8
test: adding tests for Pilots summary tables
fstagni May 20, 2025
a222b40
feat: use summary table and triggers (JobDB.Jobs, summaries for State…
fstagni May 20, 2025
6424240
test: adding tests for Jobs summary tables
fstagni May 20, 2025
434855d
fix: added VO to WMSHistory Monitoring
fstagni May 20, 2025
e70fdf2
fix: tests and other minor fixes
fstagni May 26, 2025
57245b6
fix: much faster query of selectors for web
fstagni May 26, 2025
8f03dec
fix: TransformationCleaningAgent calls directly JobDB
fstagni May 26, 2025
ecf6f18
fix: faster queries for DISTINCT values
fstagni May 26, 2025
b885720
fix: VO as VARCHAR(64) everywhere
fstagni May 26, 2025
fdb5f49
fix: removed last instance of getDistinctJobAttributes
fstagni May 26, 2025
823401d
fix: avoid full table scan when deleting
fstagni May 27, 2025
f4be6bb
test: compress csv files
fstagni May 27, 2025
4fe674d
fix: use ComputingElement instead of DestinationSite
fstagni Jun 17, 2025
3586349
fix: restored use of GridType
fstagni Jun 17, 2025
a2ec590
fix: fixes to StatesAccountingAgent for new fields
fstagni Jun 19, 2025
e20008c
fix: ID as INT UNSIGNED
fstagni Jun 26, 2025
cbe0381
docs: minor doc update on StatesAccountingAgent
fstagni Jun 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Systems / WorkloadManagement / <INSTANCE> / Agents / StatesAccountingAgent - Sub-subsection
===========================================================================================

StatesAccountingAgent sends periodically numbers of jobs in various states for various sites to the
Monitoring system to create historical plots.
StatesAccountingAgent sends exactly every 15 minutes counts of jobs in various states for various sites to the
Monitoring system to create monitoring and historical plots.

This agent doesn't have special options to configure.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@ Systems / WorkloadManagement / <INSTANCE> / Agents - Sub-subsection

In this subsection each agent is described.

+----------+----------------------------------+----------------+
| **Name** | **Description** | **Example** |
+----------+----------------------------------+----------------+
| *Agent* | Subsection named as the agent is | InputDataAgent |
| | called. | |
+----------+----------------------------------+----------------+

Common options for all the agents are described in the table below:

+---------------------+---------------------------------------+------------------------------+
Expand Down
14 changes: 14 additions & 0 deletions integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,20 @@ def exec_mysql():
os.execvp(cmd[0], cmd)


@app.command()
def exec_mysql_root():
"""Start an interactive session in the server container."""
_check_containers_running()
cmd = _build_docker_cmd("mysql", use_root=True, cwd="/")
cmd += [
"bash",
"-c",
f"exec mysql --user={DB_ROOTUSER} --password={DB_ROOTPWD}",
]
typer.secho("Opening prompt inside server container", err=True, fg=c.GREEN)
os.execvp(cmd[0], cmd)


@app.command()
def list_services():
"""List the services which have been running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ def setValuesFromDict(self, dataDict):
if errKeys:
return S_ERROR(f"Key(s) {', '.join(errKeys)} are not valid")
for key in dataDict:
self.setValueByKey(key, dataDict[key])
res = self.setValueByKey(key, dataDict[key])
if not res["OK"]:
return res
return S_OK()

def getValue(self, key):
Expand Down
41 changes: 33 additions & 8 deletions src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2132,14 +2132,39 @@ def installDatabase(self, dbName):
try:
cmdLines = self._createMySQLCMDLines(dbSql)

# We need to run one SQL cmd at once, mysql is much happier that way.
# Create a string of commands, ignoring comment lines
sqlString = "\n".join(x for x in cmdLines if not x.startswith("--"))

# Now run each command (They are seperated by ;)
# Ignore any empty ones
cmds = [x.strip() for x in sqlString.split(";") if x.strip()]
for cmd in cmds:
# Now run each command (They are seperated by ;, or by a DELIMITER)
# We need to split the string into commands, and ignore any empty ones

# Handle DELIMITER statements in SQL
delimiter = ";"
commands = []
current_command = []
for line in cmdLines:
if line.startswith("--"):
continue
if line.startswith("DELIMITER "):
delimiter = line.split("DELIMITER ", 1)[1].strip()
continue
if delimiter != ";":
if line == delimiter:
commands.append("\n".join(current_command).strip())
current_command = []
else:
current_command.append(line)
else:
if line.endswith(";"):
current_command.append(line[:-1])
commands.append("\n".join(current_command).strip())
current_command = []
else:
current_command.append(line)
if current_command:
commands.append("\n".join(current_command).strip())

# Remove empty commands
commands = [cmd for cmd in commands if cmd]

for cmd in commands:
result = self.execMySQL(cmd, dbName)
if not result["OK"]:
error = "Failed to initialize Database"
Expand Down
4 changes: 3 additions & 1 deletion src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self):

super().__init__()

self.keyFields = ["GridSite", "GridType", "Status"]
self.keyFields = ["GridSite", "ComputingElement", "GridType", "Status", "VO"]

self.monitoringFields = ["NumOfPilots"]

Expand All @@ -26,8 +26,10 @@ def __init__(self):
self.addMapping(
{
"GridSite": {"type": "keyword"},
"ComputingElement": {"type": "keyword"},
"GridType": {"type": "keyword"},
"Status": {"type": "keyword"},
"VO": {"type": "keyword"},
"NumOfPilots": {"type": "long"},
}
)
Expand Down
2 changes: 2 additions & 0 deletions src/DIRAC/MonitoringSystem/Client/Types/WMSHistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self):
"User",
"UserGroup",
"JobGroup",
"VO",
"MinorStatus",
"ApplicationStatus",
"JobSplitType",
Expand All @@ -47,6 +48,7 @@ def __init__(self):
"MinorStatus": {"type": "keyword"},
"User": {"type": "keyword"},
"JobGroup": {"type": "keyword"},
"VO": {"type": "keyword"},
"UserGroup": {"type": "keyword"},
"Tier": {"type": "keyword"},
"Type": {"type": "keyword"},
Expand Down
30 changes: 15 additions & 15 deletions src/DIRAC/MonitoringSystem/Service/WebAppHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,25 +336,25 @@ def export_getSiteSummarySelectors(cls):
types_getApplicationStates = []

@classmethod
def export_getApplicationStates(cls, condDict=None, older=None, newer=None):
def export_getApplicationStates(cls):
"""Return Distinct Values of ApplicationStatus job Attribute in WMS"""
return cls.jobDB.getDistinctJobAttributes("ApplicationStatus", condDict, older, newer)
return cls.jobDB._query("SELECT DISTINCT ApplicationStatus FROM JobsHistorySummary")

types_getJobTypes = []

@classmethod
def export_getJobTypes(cls, condDict=None, older=None, newer=None):
def export_getJobTypes(cls):
"""Return Distinct Values of JobType job Attribute in WMS"""
return cls.jobDB.getDistinctJobAttributes("JobType", condDict, older, newer)
return cls.jobDB._query("SELECT DISTINCT JobType FROM JobsHistorySummary")

types_getOwners = []

@classmethod
def export_getOwners(cls, condDict=None, older=None, newer=None):
def export_getOwners(cls):
"""
Return Distinct Values of Owner job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("Owner", condDict, older, newer)
return cls.jobDB._query("SELECT DISTINCT Owner FROM JobsHistorySummary")

types_getOwnerGroup = []

Expand All @@ -363,43 +363,43 @@ def export_getOwnerGroup(cls):
"""
Return Distinct Values of OwnerGroup from the JobDB
"""
return cls.jobDB.getDistinctJobAttributes("OwnerGroup")
return cls.jobDB._query("SELECT DISTINCT OwnerGroup FROM JobsHistorySummary")

types_getJobGroups = []

@classmethod
def export_getJobGroups(cls, condDict=None, older=None, cutDate=None):
def export_getJobGroups(cls):
"""
Return Distinct Values of ProductionId job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("JobGroup", condDict, older, newer=cutDate)
return cls.jobDB._query("SELECT DISTINCT JobGroup FROM JobsHistorySummary")

types_getSites = []

@classmethod
def export_getSites(cls, condDict=None, older=None, newer=None):
def export_getSites(cls):
"""
Return Distinct Values of Site job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("Site", condDict, older, newer)
return cls.jobDB._query("SELECT DISTINCT Site FROM JobsHistorySummary")

types_getStates = []

@classmethod
def export_getStates(cls, condDict=None, older=None, newer=None):
def export_getStates(cls):
"""
Return Distinct Values of Status job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("Status", condDict, older, newer)
return cls.jobDB._query("SELECT DISTINCT Status FROM JobsHistorySummary")

types_getMinorStates = []

@classmethod
def export_getMinorStates(cls, condDict=None, older=None, newer=None):
def export_getMinorStates(cls):
"""
Return Distinct Values of Minor Status job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("MinorStatus", condDict, older, newer)
return cls.jobDB._query("SELECT DISTINCT MinorStatus FROM JobsHistorySummary")

##############################################################################
# Transformations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.TransformationSystem.Client import TransformationStatus
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB

Expand All @@ -44,7 +43,6 @@ class TransformationCleaningAgent(AgentModule):
"""
.. class:: TransformationCleaningAgent

:param ~DIRAC.DataManagementSystem.Client.DataManager.DataManager dm: DataManager instance
:param ~TransformationClient.TransformationClient transClient: TransformationClient instance
:param ~FileCatalogClient.FileCatalogClient metadataClient: FileCatalogClient instance

Expand Down Expand Up @@ -126,8 +124,6 @@ def initialize(self):
self.reqClient = ReqClient()
# # file catalog client
self.metadataClient = FileCatalogClient()
# # job monitoring client
self.jobMonitoringClient = JobMonitoringClient()
# # job DB
self.jobDB = JobDB()

Expand Down Expand Up @@ -227,7 +223,14 @@ def finalize(self):
So, we should just clean from time to time.
What I added here is done only when the agent finalize, and it's quite light-ish operation anyway.
"""
res = self.jobDB.getDistinctJobAttributes("JobGroup", None, datetime.utcnow() - timedelta(days=365))

res = self.jobDB.getDistinctAttributeValues(
"Jobs",
"JobGroup",
older=datetime.utcnow() - timedelta(days=365),
timeStamp="LastUpdateTime",
)

if not res["OK"]:
self.log.error("Failed to get job groups", res["Message"])
return res
Expand Down Expand Up @@ -271,7 +274,7 @@ def finalize(self):

# Remove JobIDs that were unknown to the TransformationSystem
jobGroupsToCheck = [str(transDict["TransformationID"]).zfill(8) for transDict in toClean + toArchive]
res = self.jobMonitoringClient.getJobs({"JobGroup": jobGroupsToCheck})
res = self.jobDB.selectJobs({"JobGroup": jobGroupsToCheck})
if not res["OK"]:
return res
jobIDsToRemove = [int(jobID) for jobID in res["Value"]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def initialize(self):

def _getAllowedJobTypes(self):
"""Get valid jobTypes"""
result = self.jobDB.getDistinctJobAttributes("JobType")
result = self.jobDB._query("SELECT DISTINCT JobType FROM JobsHistorySummary")
if not result["OK"]:
return result
cleanJobTypes = []
Expand Down
42 changes: 23 additions & 19 deletions src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,25 @@ class StatesAccountingAgent(AgentModule):
__summaryKeyFieldsMapping = [
"Status",
"Site",
"User",
"UserGroup",
"Owner",
"OwnerGroup",
"JobGroup",
"VO",
"JobType",
"ApplicationStatus",
"MinorStatus",
]
__summaryDefinedFields = [("ApplicationStatus", "unset"), ("MinorStatus", "unset")]
__summaryValueFieldsMapping = ["Jobs", "Reschedules"]
__renameFieldsMapping = {"JobType": "JobSplitType"}
__summaryValueFieldsMapping = ["JobCount", "RescheduleSum"]
__renameFieldsMapping = {
"Owner": "User",
"OwnerGroup": "UserGroup",
"JobType": "JobSplitType",
"JobCount": "Jobs",
"RescheduleSum": "Reschedules",
}

# PilotsHistory fields
__pilotsMapping = ["GridSite", "GridType", "Status", "NumOfPilots"]
__pilotsMapping = ["GridSite", "ComputingElement", "GridType", "Status", "VO", "NumOfPilots"]

def initialize(self):
"""Standard initialization"""
Expand Down Expand Up @@ -88,15 +94,16 @@ def execute(self):
# PilotsHistory to Monitoring
if "Monitoring" in self.pilotMonitoringOption:
self.log.info("Committing PilotsHistory to Monitoring")
result = PilotAgentsDB().getSummarySnapshot()
sql = "SELECT * FROM PilotsHistorySummary ORDER BY GridSite, ComputingElement, GridType, Status, VO;"
result = PilotAgentsDB()._query(sql)
now = datetime.datetime.utcnow()
if not result["OK"]:
self.log.error(
"Can't get the PilotAgentsDB summary",
f"{result['Message']}: won't commit PilotsHistory at this cycle",
)

values = result["Value"][1]
values = result["Value"]
for record in values:
rD = {}
for iP, _ in enumerate(self.__pilotsMapping):
Expand All @@ -112,25 +119,20 @@ def execute(self):

# WMSHistory to Monitoring or Accounting
self.log.info(f"Committing WMSHistory to {'and '.join(self.jobMonitoringOption)} backend")
result = JobDB().getSummarySnapshot(self.__jobDBFields)
now = datetime.datetime.utcnow()
result = JobDB()._query(
f"SELECT {','.join(self.__summaryKeyFieldsMapping + self.__summaryValueFieldsMapping)} FROM JobsHistorySummary ORDER BY {','.join(self.__summaryKeyFieldsMapping)}"
)
if not result["OK"]:
self.log.error("Can't get the JobDB summary", f"{result['Message']}: won't commit WMSHistory at this cycle")
return S_ERROR()

values = result["Value"][1]

values = result["Value"]
now = datetime.datetime.utcnow()
self.log.info("Start sending WMSHistory records")
for record in values:
rD = {}
for fV in self.__summaryDefinedFields:
rD[fV[0]] = fV[1]
for iP, _ in enumerate(self.__summaryKeyFieldsMapping):
fieldName = self.__summaryKeyFieldsMapping[iP]
for iP, fieldName in enumerate(self.__summaryKeyFieldsMapping + self.__summaryValueFieldsMapping):
rD[self.__renameFieldsMapping.get(fieldName, fieldName)] = record[iP]
record = record[len(self.__summaryKeyFieldsMapping) :]
for iP, _ in enumerate(self.__summaryValueFieldsMapping):
rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP])

for backend in self.datastores:
if backend.lower() == "monitoring":
Expand All @@ -141,6 +143,8 @@ def execute(self):
self.datastores["Monitoring"].addRecord(rD)

elif backend.lower() == "accounting":
rD.pop("VO") # Remove VO field for Accounting
rD.pop("ApplicationStatus") # Remove ApplicationStatus for Accounting
acWMS = WMSHistory()
acWMS.setStartTime(now)
acWMS.setEndTime(now)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ def jca(mocker):
create=True,
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB._query", side_effect=mockReply)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
Expand Down
Loading
Loading