Skip to content

Commit

Permalink
Merge pull request #7866 from fstagni/80_bulkInsertJobLoggingDB
Browse files Browse the repository at this point in the history
[8.0] adding possibility to bulk insert in JobLoggingDB
  • Loading branch information
fstagni authored Nov 20, 2024
2 parents da6ec19 + cd14009 commit ec01262
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 29 deletions.
46 changes: 40 additions & 6 deletions src/DIRAC/Core/Utilities/MySQL.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,20 +529,19 @@ def __del__(self):
except Exception:
pass

def _except(self, methodName, x, err, cmd="", print=True):
def _except(self, methodName, x, err, cmd="", debug=True):
"""
print MySQL error or exception
return S_ERROR with Exception
"""

try:
raise x
except MySQLdb.Error as e:
if print:
if debug:
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", "%d: %s" % (e.args[0], e.args[1]))
return S_ERROR(DErrno.EMYSQL, "%s: ( %d: %s )" % (err, e.args[0], e.args[1]))
except Exception as e:
if print:
if debug:
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", repr(e))
return S_ERROR(DErrno.EMYSQL, f"{err}: ({repr(e)})")

Expand Down Expand Up @@ -759,8 +758,8 @@ def _update(self, cmd, *, args=None, conn=None, debug=True):
:param conn: connection object.
:param debug: print or not the errors
return S_OK with number of updated registers upon success
return S_ERROR upon error
:return: S_OK with number of updated registers upon success.
S_ERROR upon error.
"""

self.log.debug(f"_update: {self._safeCmd(cmd)}")
Expand Down Expand Up @@ -788,6 +787,41 @@ def _update(self, cmd, *, args=None, conn=None, debug=True):

return retDict

@captureOptimizerTraces
def _updatemany(self, cmd, data, *, conn=None, debug=True):
"""execute MySQL updatemany command
:param debug: print or not the errors
:return: S_OK with number of updated registers upon success.
S_ERROR upon error.
"""

self.log.debug(f"_updatemany: {self._safeCmd(cmd)}")
if conn:
connection = conn
else:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]

try:
cursor = connection.cursor()
res = cursor.executemany(cmd, data)
retDict = S_OK(res)
if cursor.lastrowid:
retDict["lastRowId"] = cursor.lastrowid
except Exception as x:
retDict = self._except("_updatemany", x, "Execution failed.", cmd, debug)

try:
cursor.close()
except Exception:
pass

return retDict

def _transaction(self, cmdList, conn=None):
"""dummy transaction support
Expand Down
6 changes: 6 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ def setJobAttribute(self, jobID, attrName, attrValue, update=False, myDate=None,
:return: S_OK/S_ERROR
"""

if not jobID:
return S_OK()

if attrName not in self.jobAttributeNames:
return S_ERROR(EWMSJMAN, "Request to set non-existing job attribute")

Expand Down Expand Up @@ -505,6 +508,9 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
:return: S_OK/S_ERROR
"""

if not jobID:
return S_OK()

jobIDList = jobID
if not isinstance(jobID, (list, tuple)):
jobIDList = [jobID]
Expand Down
63 changes: 45 additions & 18 deletions src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
getWMSTimeStamps()
"""
import datetime
import time

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC import S_ERROR, S_OK
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import TimeUtilities

MAGIC_EPOC_NUMBER = 1270000000

Expand Down Expand Up @@ -56,33 +55,61 @@ def addLoggingRecord(
event = f"status/minor/app={status}/{minorStatus}/{applicationStatus}"
self.log.info("Adding record for job ", str(jobID) + ": '" + event + "' from " + source)

try:
def _get_date(date):
# We need to specify that timezone is UTC because otherwise timestamp
# assumes local time while we mean UTC.
if not date:
# Make the UTC datetime string and float
_date = datetime.datetime.utcnow()
# Make the UTC datetime
return datetime.datetime.utcnow()
elif isinstance(date, str):
# The date is provided as a string in UTC
_date = TimeUtilities.fromString(date)
return TimeUtilities.fromString(date)
elif isinstance(date, datetime.datetime):
_date = date
return date
else:
raise Exception("Incorrect date for the logging record")

try:
if isinstance(date, list):
_date = []
for d in date:
try:
_date.append(_get_date(d))
except Exception:
self.log.exception("Exception while date evaluation")
_date.append(datetime.datetime.utcnow())
else:
self.log.error("Incorrect date for the logging record")
_date = datetime.datetime.utcnow()
_date = _get_date(date)
except Exception:
self.log.exception("Exception while date evaluation")
_date = datetime.datetime.utcnow()

# We need to specify that timezone is UTC because otherwise timestamp
# assumes local time while we mean UTC.
epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
_date = [datetime.datetime.utcnow()]

cmd = (
"INSERT INTO LoggingInfo (JobId, Status, MinorStatus, ApplicationStatus, "
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES (%d,'%s','%s','%s','%s',%f,'%s')"
% (int(jobID), status, minorStatus, applicationStatus[:255], str(_date), epoc, source[:32])
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES "
)

return self._update(cmd)
if not isinstance(jobID, list):
jobID = [jobID]

if isinstance(status, str):
status = [status] * len(jobID)
if isinstance(minorStatus, str):
minorStatus = [minorStatus] * len(jobID)
if isinstance(applicationStatus, str):
applicationStatus = [applicationStatus[:255]] * len(jobID)
if isinstance(_date, datetime.datetime):
_date = [_date] * len(jobID)

epocs = []
for dt in _date:
epoc = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
epocs.append(epoc)
cmd = cmd + "(%s, %s, %s, %s, %s, %s, %s)"
data = list(
zip(jobID, status, minorStatus, applicationStatus, _date, epocs, [source[:32]] * len(jobID), strict=True)
)
return self._updatemany(cmd, data)

#############################################################################
def getJobLoggingInfo(self, jobID):
Expand Down
25 changes: 20 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def export_submitJob(self, jobDesc):
jobDescList = [jobDesc]

jobIDList = []
statusList = []
minorStatusList = []
timeStampList = []

if parametricJob:
initialStatus = JobStatus.SUBMITTING
Expand All @@ -199,13 +202,25 @@ def export_submitJob(self, jobDesc):
return result

jobID = result["JobID"]
self.log.info(f'Job added to the JobDB", "{jobID} for {self.ownerDN}/{self.ownerGroup}')

self.jobLoggingDB.addLoggingRecord(
jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager"
)
self.log.info(f"Job added to the JobDB", f"{jobID} for {self.ownerDN}/{self.ownerGroup}")

jobIDList.append(jobID)
statusList.append(result["Status"])
minorStatusList.append(result["MinorStatus"])
timeStampList.append(result["TimeStamp"])

# insert records in logging DB

# For parametric jobs I can insert logging records in a bulk
if parametricJob and len(set(jobIDList)) == len(jobIDList):
result = self.jobLoggingDB.addLoggingRecord(
jobIDList, statusList, minorStatusList, date=timeStampList, source="JobManager"
)
else:
for jobID, status, minorStatus, timeStamp in zip(jobIDList, statusList, minorStatusList, timeStampList):
result = self.jobLoggingDB.addLoggingRecord(
jobID, status, minorStatus, date=timeStamp, source="JobManager"
)

# Set persistency flag
retVal = gProxyManager.getUserPersistence(self.ownerDN, self.ownerGroup)
Expand Down
18 changes: 18 additions & 0 deletions tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,22 @@ def test_JobStatus(jobLoggingDB: JobLoggingDB):
result = jobLoggingDB.getWMSTimeStamps(1)
assert result["OK"] is True, result["Message"]

now = datetime.datetime.utcnow()
result = jobLoggingDB.addLoggingRecord(
[2, 3, 4, 5],
status=["testing", "testing", "testing", "testing"],
minorStatus=["mn", "mn", "mn", "mn"],
date=[now, now, now, now],
source="Unittest",
)
assert result["OK"] is True, result["Message"]

result = jobLoggingDB.getJobLoggingInfo(2)
assert result["OK"] is True, result["Message"]
assert result["Value"][-1][0:3] == ("testing", "mn", "Unknown")

result = jobLoggingDB.getJobLoggingInfo(5)
assert result["OK"] is True, result["Message"]
assert result["Value"][-1][0:3] == ("testing", "mn", "Unknown")

jobLoggingDB.deleteJob(1)

0 comments on commit ec01262

Please sign in to comment.