Skip to content

Commit d235a78

Browse files
committed
refactor: remove use of JobMonitoringClient from JobStatus
1 parent dc1f63e commit d235a78

File tree

3 files changed

+25
-47
lines changed

3 files changed

+25
-47
lines changed

src/DIRAC/Interfaces/API/Dirac.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
4747
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
4848
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
49+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition
4950

5051

5152
def parseArguments(args):
@@ -1450,10 +1451,13 @@ def deleteJob(self, jobID):
14501451
# Remove any job IDs that can't change to the Killed or Deleted states
14511452
filteredJobs = set()
14521453
for filterState in (JobStatus.KILLED, JobStatus.DELETED):
1453-
filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState)
1454-
if not filterRes["OK"]:
1455-
return filterRes
1456-
filteredJobs.update(filterRes["Value"])
1454+
# get a dictionary of jobID:status
1455+
res = JobMonitoringClient().getJobsStatus(jobIDs)
1456+
if not res["OK"]:
1457+
return res
1458+
js = {k: v["Status"] for k, v in res["Value"].items()}
1459+
# then filter
1460+
filteredJobs.update(_filterJobStateTransition(js, filterState))
14571461

14581462
return WMSClient(useCertificates=self.useCertificates).deleteJob(list(filteredJobs))
14591463

@@ -1480,11 +1484,13 @@ def rescheduleJob(self, jobID):
14801484
return ret
14811485
jobIDs = ret["Value"]
14821486

1483-
# Remove any job IDs that can't change to the rescheduled state
1484-
filterRes = JobStatus.filterJobStateTransition(jobIDs, JobStatus.RESCHEDULED)
1485-
if not filterRes["OK"]:
1486-
return filterRes
1487-
jobIDsToReschedule = filterRes["Value"]
1487+
# get a dictionary of jobID:status
1488+
res = JobMonitoringClient().getJobsStatus(jobIDs)
1489+
if not res["OK"]:
1490+
return res
1491+
js = {k: v["Status"] for k, v in res["Value"].items()}
1492+
# then filter
1493+
jobIDsToReschedule = _filterJobStateTransition(js, JobStatus.RESCHEDULED)
14881494

14891495
return WMSClient(useCertificates=self.useCertificates).rescheduleJob(jobIDsToReschedule)
14901496

@@ -1510,10 +1516,13 @@ def killJob(self, jobID):
15101516
# Remove any job IDs that can't change to the Killed or Deleted states
15111517
filteredJobs = set()
15121518
for filterState in (JobStatus.KILLED, JobStatus.DELETED):
1513-
filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState)
1514-
if not filterRes["OK"]:
1515-
return filterRes
1516-
filteredJobs.update(filterRes["Value"])
1519+
# get a dictionary of jobID:status
1520+
res = JobMonitoringClient().getJobsStatus(jobIDs)
1521+
if not res["OK"]:
1522+
return res
1523+
js = {k: v["Status"] for k, v in res["Value"].items()}
1524+
# then filter
1525+
filteredJobs.update(_filterJobStateTransition(js, filterState))
15171526

15181527
return WMSClient(useCertificates=self.useCertificates).killJob(list(filteredJobs))
15191528

src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
This module contains constants and lists for the possible job states.
33
"""
44

5-
from DIRAC import S_OK
65
from DIRAC.Core.Utilities.StateMachine import State, StateMachine
7-
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
86

97
#:
108
SUBMITTING = "Submitting"
@@ -95,26 +93,3 @@ def __init__(self, state):
9593
RECEIVED: State(1, [SCOUTING, CHECKING, STAGING, WAITING, FAILED, DELETED, KILLED], defState=RECEIVED),
9694
SUBMITTING: State(0, [RECEIVED, CHECKING, DELETED, KILLED], defState=SUBMITTING), # initial state
9795
}
98-
99-
100-
def filterJobStateTransition(jobIDs, candidateState):
101-
"""Given a list of jobIDs, return a list that are allowed to transition
102-
to the given candidate state.
103-
"""
104-
allowedJobs = []
105-
106-
if not isinstance(jobIDs, list):
107-
jobIDs = [jobIDs]
108-
109-
res = JobMonitoringClient().getJobsStatus(jobIDs)
110-
if not res["OK"]:
111-
return res
112-
113-
for jobID in jobIDs:
114-
if jobID in res["Value"]:
115-
curState = res["Value"][jobID]["Status"]
116-
stateRes = JobsStateMachine(curState).getNextState(candidateState)
117-
if stateRes["OK"]:
118-
if stateRes["Value"] == candidateState:
119-
allowedJobs.append(jobID)
120-
return S_OK(allowedJobs)

src/DIRAC/WorkloadManagementSystem/Utilities/jobAdministration.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,11 @@ def kill_delete_jobs(right, validJobList, nonauthJobList=[], force=False):
8484
jobStates = result["Value"]
8585

8686
# Get the jobs allowed to transition to the Killed state
87-
filterRes = _filterJobStateTransition(jobStates, JobStatus.KILLED)
88-
if not filterRes["OK"]:
89-
return filterRes
90-
killJobList.extend(filterRes["Value"])
87+
killJobList.extend(_filterJobStateTransition(jobStates, JobStatus.KILLED))
9188

9289
if right == RIGHT_DELETE:
9390
# Get the jobs allowed to transition to the Deleted state
94-
filterRes = _filterJobStateTransition(jobStates, JobStatus.DELETED)
95-
if not filterRes["OK"]:
96-
return filterRes
97-
deleteJobList.extend(filterRes["Value"])
91+
deleteJobList.extend(_filterJobStateTransition(jobStates, JobStatus.DELETED))
9892

9993
for jobID in killJobList:
10094
result = _killJob(jobID, force=force)
@@ -141,4 +135,4 @@ def _filterJobStateTransition(jobStates, candidateState):
141135
if stateRes["OK"]:
142136
if stateRes["Value"] == candidateState:
143137
allowedJobs.append(js[0])
144-
return S_OK(allowedJobs)
138+
return allowedJobs

0 commit comments

Comments
 (0)