Skip to content

Commit

Permalink
Merge pull request #7833 from DIRACGridBot/cherry-pick-2-c35d052af-in…
Browse files Browse the repository at this point in the history
…tegration

[sweep:integration] Improve getTransformationFiles performance
  • Loading branch information
chrisburr authored Oct 14, 2024
2 parents 4e54bdb + 764c218 commit 8bf26eb
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 95 deletions.
93 changes: 41 additions & 52 deletions src/DIRAC/TransformationSystem/Client/TransformationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def getTransformationFiles(
timeout=1800,
offset=0,
maxfiles=None,
columns=None,
):
"""gets all the transformation files for a transformation, incrementally.
"limit" here is just used to determine the offset.
Expand All @@ -173,34 +174,39 @@ def getTransformationFiles(
condDict = {}
if timeStamp is None:
timeStamp = "LastUpdate"
# getting transformationFiles - incrementally
if "LFN" in condDict:
if isinstance(condDict["LFN"], str):
lfnList = [condDict["LFN"]]
else:
lfnList = sorted(condDict["LFN"])
# If a list of LFNs is given, use chunks of 1000 only
limit = limit if limit else 1000

if "LFN" not in condDict:
res = rpcClient.getTransformationFiles(
condDict, older, newer, timeStamp, orderAttribute, offset, maxfiles, columns
)
# TransformationDB.getTransformationFiles includes a "Records"/"ParameterNames"
# that we don't want to return to the client so explicitly return S_OK with the value
if not res["OK"]:
return res
return S_OK(res["Value"])

# If LFNs requested, request in small batches, because...
# Probably not needed? Because this should always be a list
if isinstance(condDict["LFN"], str):
lfnList = [condDict["LFN"]]
else:
# By default get by chunks of 10000 files
lfnList = []
limit = limit if limit else 10000
lfnList = sorted(condDict["LFN"])
# If a list of LFNs is given, default to chunks of 1000 only
limit = limit if limit else 1000

transID = condDict.get("TransformationID", "Unknown")
offsetToApply = offset
retries = 5
while True:
if lfnList:
# If list is exhausted, exit
if offsetToApply >= len(lfnList):
break
# Apply the offset to the list of LFNs
condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit]
# No limit and no offset as the list is limited already
res = rpcClient.getTransformationFiles(condDict, older, newer, timeStamp, orderAttribute, None, None)
else:
res = rpcClient.getTransformationFiles(
condDict, older, newer, timeStamp, orderAttribute, limit, offsetToApply
)
# If list is exhausted, exit
if offsetToApply >= len(lfnList):
break
# Apply the offset to the list of LFNs
condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit]
# No limit and no offset as the list is limited already
res = rpcClient.getTransformationFiles(
condDict, older, newer, timeStamp, orderAttribute, None, None, columns
)
if not res["OK"]:
gLogger.error(
"Error getting files for transformation %s (offset %d), %s"
Expand All @@ -211,36 +217,19 @@ def getTransformationFiles(
if retries:
continue
return res
else:
condDictStr = str(condDict)
log = gLogger.debug if len(condDictStr) > 100 else gLogger.verbose
if not log(
"For conditions %s: result for limit %d, offset %d: %d files"
% (condDictStr, limit, offsetToApply, len(res["Value"]))
):
gLogger.verbose(
"For condition keys %s (trans %s): result for limit %d, offset %d: %d files"
% (
str(sorted(condDict)),
condDict.get("TransformationID", "None"),
limit,
offsetToApply,
len(res["Value"]),
)
)
if res["Value"]:
transformationFiles += res["Value"]
# Limit the number of files returned
if maxfiles and len(transformationFiles) >= maxfiles:
transformationFiles = transformationFiles[:maxfiles]
break
# Less data than requested, exit only if LFNs were not given
if not lfnList and len(res["Value"]) < limit:
gLogger.verbose(f"Result for limit {limit}, offset {offsetToApply}: {len(res['Value'])} files")
if res["Value"]:
transformationFiles += res["Value"]
# Limit the number of files returned
if maxfiles and len(transformationFiles) >= maxfiles:
transformationFiles = transformationFiles[:maxfiles]
break
offsetToApply += limit
# Reset number of retries for next chunk
retries = 5

# Less data than requested, exit only if LFNs were not given
if not lfnList and len(res["Value"]) < limit:
break
offsetToApply += limit
# Reset number of retries for next chunk
retries = 5
return S_OK(transformationFiles)

def getTransformationTasks(
Expand Down
75 changes: 32 additions & 43 deletions src/DIRAC/TransformationSystem/DB/TransformationDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,60 +594,49 @@ def getTransformationFiles(
limit=None,
offset=None,
connection=False,
columns=None,
):
"""Get files for the supplied transformations with support for the web standard structure"""
connection = self.__getConnection(connection)
req = f"SELECT {intListToString(self.TRANSFILEPARAMS)} FROM TransformationFiles"
originalFileIDs = {}
if condDict is None:
condDict = {}
if condDict or older or newer:
lfns = condDict.pop("LFN", None)
if lfns:
if isinstance(lfns, str):
lfns = [lfns]
res = self.__getFileIDsForLfns(lfns, connection=connection)
if not res["OK"]:
return res
originalFileIDs = res["Value"][0]
condDict["FileID"] = list(originalFileIDs)

for val in condDict.values():
if not val:
return S_OK([])
all_columns = ["LFN"] + self.TRANSFILEPARAMS
if columns is None:
columns = all_columns
elif not set(columns).issubset(all_columns):
return S_ERROR(f"Invalid columns requested, valid columns are: {all_columns}")

req = ", ".join(f"df.{x}" if x == "LFN" else f"tf.{x}" for x in columns)
req = f"SELECT {req} FROM TransformationFiles tf"
if "LFN" in columns or (condDict and "LFN" in condDict):
req = f"{req} JOIN DataFiles df ON tf.FileID = df.FileID"

fixedCondDict = {}
if condDict:
for key, value in condDict.items():
if key in self.TRANSFILEPARAMS:
fixedCondDict[f"tf.{key}"] = value
elif key in ["LFN"]:
fixedCondDict[f"df.{key}"] = value
else:
return S_ERROR(f"Invalid key {key} in condDict")
if timeStamp:
timeStamp = f"tf.{timeStamp}"
if fixedCondDict or older or newer:
cond = self.buildCondition(fixedCondDict, older, newer, timeStamp, orderAttribute, limit, offset=offset)
# When buildCondition tries to quote the column names, it will fail due to the table alias
# So we need to move the single quotes to the right place
req += f" {cond.replace('`tf.', 'tf.`').replace('`df.', 'df.`')}"

req = "{} {}".format(
req,
self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset),
)
res = self._query(req, conn=connection)
if not res["OK"]:
return res

transFiles = res["Value"]
fileIDs = [int(row[1]) for row in transFiles]
webList = []
resultList = []
if not fileIDs:
originalFileIDs = {}
else:
if not originalFileIDs:
res = self.__getLfnsForFileIDs(fileIDs, connection=connection)
if not res["OK"]:
return res
originalFileIDs = res["Value"][1]
for row in transFiles:
lfn = originalFileIDs[row[1]]
# Prepare the structure for the web
fDict = {"LFN": lfn}
fDict.update(dict(zip(self.TRANSFILEPARAMS, row)))
# Note: the line below is returning "None" if the item is None... This seems to work but is ugly...
rList = [lfn] + [str(item) if not isinstance(item, int) else item for item in row]
webList.append(rList)
resultList.append(fDict)
resultList = [dict(zip(columns, row)) for row in res["Value"]]
webList = [[str(item) if not isinstance(item, int) else item for item in row] for row in res["Value"]]

result = S_OK(resultList)
result["Records"] = webList
result["ParameterNames"] = ["LFN"] + self.TRANSFILEPARAMS
result["ParameterNames"] = columns
return result

def getFileSummary(self, lfns, connection=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def export_getTransformationFiles(
orderAttribute=None,
limit=None,
offset=None,
columns=None,
):
if not condDict:
condDict = {}
Expand All @@ -301,6 +302,7 @@ def export_getTransformationFiles(
limit=limit,
offset=offset,
connection=False,
columns=columns,
)

####################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ def test_addTasksAndFiles(self):
for f in res["Value"]:
self.assertEqual(f["Status"], TransformationFilesStatus.ASSIGNED)

# make sure we can selectively select LFNs
res = self.transClient.getTransformationFiles({"TransformationID": transID, "LFN": ["/aa/lfn.1.txt"]})
assert res["OK"], res
assert len(res["Value"]) == 1, res
assert "TargetSE" in res["Value"][0].keys(), res

# make sure we can selectively select columns
res = self.transClient.getTransformationFiles({"TransformationID": transID}, columns=["LFN", "Status"])
assert res["OK"], res
assert sorted(res["Value"][0]) == ["LFN", "Status"], res

# now adding a new Transformation with new tasks, and introducing a mix of insertion,
# to test that the trigger works as it should
res = self.transClient.addTransformation(
Expand Down

0 comments on commit 8bf26eb

Please sign in to comment.