Skip to content

Commit

Permalink
Merge pull request #7874 from fstagni/80_force_flag_TS_Cleaning
Browse files Browse the repository at this point in the history
[8.0] the TransformationCleaningAgent forces jobs to be killed
  • Loading branch information
fstagni authored Nov 20, 2024
2 parents b99d998 + 1af2e4d commit da6ec19
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import errno
import os
import re
import time
from datetime import datetime, timedelta
from hashlib import md5

# # from DIRAC
from DIRAC import S_ERROR, S_OK
Expand Down Expand Up @@ -655,7 +653,7 @@ def __removeWMSTasks(self, transJobIDs):
jobIDs = [int(j) for j in transJobIDs if int(j)]
allRemove = True
for jobList in breakListIntoChunks(jobIDs, 500):
res = self.wmsClient.killJob(jobList)
res = self.wmsClient.killJob(jobList, force=True)
if res["OK"]:
self.log.info(f"Successfully killed {len(jobList)} jobs from WMS")
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):
Expand Down Expand Up @@ -721,6 +719,11 @@ def __submitRemovalRequests(self, lfns, transID=0):
:param int transID: transformationID, only used in RequestName
:returns: S_ERROR/S_OK
"""

# These imports are used only in this function
import time
from hashlib import md5

for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)):
oRequest = Request()
requestName = "TCA_{transID}_{index}_{md5(repr(time.time()).encode()).hexdigest()[:5]}"
Expand Down
8 changes: 4 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,17 @@ def submitJob(self, jdl, jobDescriptionObject=None):

return result

def killJob(self, jobID):
def killJob(self, jobID, force=False):
"""Kill running job.
jobID can be an integer representing a single DIRAC job ID or a list of IDs
"""
return self.jobManager.killJob(jobID)
return self.jobManager.killJob(jobID, force=force)

def deleteJob(self, jobID):
def deleteJob(self, jobID, force=False):
"""Delete job(s) (set their status to DELETED) from the WMS Job database.
jobID can be an integer representing a single DIRAC job ID or a list of IDs
"""
return self.jobManager.deleteJob(jobID)
return self.jobManager.deleteJob(jobID, force=force)

def removeJob(self, jobID):
"""Fully remove job(s) from the WMS Job database.
Expand Down
4 changes: 2 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ def setJobsMajorStatus(self, jIDList, candidateStatus, force=False):

return self._update(cmd)

def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""):
def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus="", force=False):
"""Set status of the job specified by its jobID"""
# Do not update the LastUpdate time stamp if setting the Stalled status
update_flag = True
Expand All @@ -619,7 +619,7 @@ def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""):
attrNames.append("ApplicationStatus")
attrValues.append(applicationStatus[:255])

result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag)
result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag, force=force)
if not result["OK"]:
return result

Expand Down
24 changes: 13 additions & 11 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,14 @@ def export_removeJob(self, jobIDs):

return S_OK(validJobList)

def __deleteJob(self, jobID):
def __deleteJob(self, jobID, force=False):
"""Set the job status to "Deleted"
and remove the pilot that ran and its logging info if the pilot is finished.
:param int jobID: job ID
:return: S_OK()/S_ERROR()
"""
result = self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting")
result = self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force)
if not result["OK"]:
return result

Expand Down Expand Up @@ -475,7 +475,7 @@ def __deleteJob(self, jobID):

return S_OK()

def __killJob(self, jobID, sendKillCommand=True):
def __killJob(self, jobID, sendKillCommand=True, force=False):
"""Kill one job
:param int jobID: job ID
Expand All @@ -488,14 +488,16 @@ def __killJob(self, jobID, sendKillCommand=True):
return result

self.log.info("Job marked for termination", jobID)
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination"))["OK"]:
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))[
"OK"
]:
self.log.warn("Failed to set job Killed status", result["Message"])
if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]:
self.log.warn("Failed to delete job from the TaskQueue", result["Message"])

return S_OK()

def _kill_delete_jobs(self, jobIDList, right):
def _kill_delete_jobs(self, jobIDList, right, force=False):
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary
:param list jobIDList: job IDs
Expand Down Expand Up @@ -535,12 +537,12 @@ def _kill_delete_jobs(self, jobIDList, right):
stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING]

for jobID in killJobList:
result = self.__killJob(jobID)
result = self.__killJob(jobID, force=force)
if not result["OK"]:
badIDs.append(jobID)

for jobID in deleteJobList:
result = self.__deleteJob(jobID)
result = self.__deleteJob(jobID, force=force)
if not result["OK"]:
badIDs.append(jobID)

Expand Down Expand Up @@ -573,28 +575,28 @@ def _kill_delete_jobs(self, jobIDList, right):
###########################################################################
types_deleteJob = []

def export_deleteJob(self, jobIDs):
def export_deleteJob(self, jobIDs, force=False):
"""Delete jobs specified in the jobIDs list
:param list jobIDs: list of job IDs
:return: S_OK/S_ERROR
"""

return self._kill_delete_jobs(jobIDs, RIGHT_DELETE)
return self._kill_delete_jobs(jobIDs, RIGHT_DELETE, force=force)

###########################################################################
types_killJob = []

def export_killJob(self, jobIDs):
def export_killJob(self, jobIDs, force=False):
"""Kill jobs specified in the jobIDs list
:param list jobIDs: list of job IDs
:return: S_OK/S_ERROR
"""

return self._kill_delete_jobs(jobIDs, RIGHT_KILL)
return self._kill_delete_jobs(jobIDs, RIGHT_KILL, force=force)

###########################################################################
types_resetJob = []
Expand Down

0 comments on commit da6ec19

Please sign in to comment.