diff --git a/src/DIRAC/TransformationSystem/Client/TransformationClient.py b/src/DIRAC/TransformationSystem/Client/TransformationClient.py index 9c82d2683d3..7f1b7daa81a 100644 --- a/src/DIRAC/TransformationSystem/Client/TransformationClient.py +++ b/src/DIRAC/TransformationSystem/Client/TransformationClient.py @@ -163,6 +163,7 @@ def getTransformationFiles( timeout=1800, offset=0, maxfiles=None, + columns=None, ): """gets all the transformation files for a transformation, incrementally. "limit" here is just used to determine the offset. @@ -173,34 +174,39 @@ def getTransformationFiles( condDict = {} if timeStamp is None: timeStamp = "LastUpdate" - # getting transformationFiles - incrementally - if "LFN" in condDict: - if isinstance(condDict["LFN"], str): - lfnList = [condDict["LFN"]] - else: - lfnList = sorted(condDict["LFN"]) - # If a list of LFNs is given, use chunks of 1000 only - limit = limit if limit else 1000 + + if "LFN" not in condDict: + res = rpcClient.getTransformationFiles( + condDict, older, newer, timeStamp, orderAttribute, offset, maxfiles, columns + ) + # TransformationDB.getTransformationFiles includes a "Records"/"ParameterNames" + # that we don't want to return to the client so explicitly return S_OK with the value + if not res["OK"]: + return res + return S_OK(res["Value"]) + + # If LFNs requested, request in small batches, because... + # Probably not needed? Because this should always be a list + if isinstance(condDict["LFN"], str): + lfnList = [condDict["LFN"]] else: - # By default get by chunks of 10000 files - lfnList = [] - limit = limit if limit else 10000 + lfnList = sorted(condDict["LFN"]) + # If a list of LFNs is given, default to chunks of 1000 only + limit = limit if limit else 1000 + transID = condDict.get("TransformationID", "Unknown") offsetToApply = offset retries = 5 while True: - if lfnList: - # If list is exhausted, exit - if offsetToApply >= len(lfnList): - break - # Apply the offset to the list of LFNs - condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit] - # No limit and no offset as the list is limited already - res = rpcClient.getTransformationFiles(condDict, older, newer, timeStamp, orderAttribute, None, None) - else: - res = rpcClient.getTransformationFiles( - condDict, older, newer, timeStamp, orderAttribute, limit, offsetToApply - ) + # If list is exhausted, exit + if offsetToApply >= len(lfnList): + break + # Apply the offset to the list of LFNs + condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit] + # No limit and no offset as the list is limited already + res = rpcClient.getTransformationFiles( + condDict, older, newer, timeStamp, orderAttribute, None, None, columns + ) if not res["OK"]: gLogger.error( "Error getting files for transformation %s (offset %d), %s" @@ -211,36 +217,19 @@ def getTransformationFiles( if retries: continue return res - else: - condDictStr = str(condDict) - log = gLogger.debug if len(condDictStr) > 100 else gLogger.verbose - if not log( - "For conditions %s: result for limit %d, offset %d: %d files" - % (condDictStr, limit, offsetToApply, len(res["Value"])) - ): - gLogger.verbose( - "For condition keys %s (trans %s): result for limit %d, offset %d: %d files" - % ( - str(sorted(condDict)), - condDict.get("TransformationID", "None"), - limit, - offsetToApply, - len(res["Value"]), - ) - ) - if res["Value"]: - transformationFiles += res["Value"] - # Limit the number of files returned - if maxfiles and len(transformationFiles) >= maxfiles: - transformationFiles = transformationFiles[:maxfiles] - break - # Less data than requested, exit only if LFNs were not given - if not lfnList and len(res["Value"]) < limit: + gLogger.verbose(f"Result for limit {limit}, offset {offsetToApply}: {len(res['Value'])} files") + if res["Value"]: + transformationFiles += res["Value"] + # Limit the number of files returned + if maxfiles and len(transformationFiles) >= maxfiles: + transformationFiles = transformationFiles[:maxfiles] break - offsetToApply += limit - # Reset number of retries for next chunk - retries = 5 - + # Less data than requested, exit only if LFNs were not given + if not lfnList and len(res["Value"]) < limit: + break + offsetToApply += limit + # Reset number of retries for next chunk + retries = 5 return S_OK(transformationFiles) def getTransformationTasks( diff --git a/src/DIRAC/TransformationSystem/DB/TransformationDB.py b/src/DIRAC/TransformationSystem/DB/TransformationDB.py index f526c383a5d..f311d0d23bc 100755 --- a/src/DIRAC/TransformationSystem/DB/TransformationDB.py +++ b/src/DIRAC/TransformationSystem/DB/TransformationDB.py @@ -594,60 +594,49 @@ def getTransformationFiles( limit=None, offset=None, connection=False, + columns=None, ): """Get files for the supplied transformations with support for the web standard structure""" connection = self.__getConnection(connection) - req = f"SELECT {intListToString(self.TRANSFILEPARAMS)} FROM TransformationFiles" - originalFileIDs = {} - if condDict is None: - condDict = {} - if condDict or older or newer: - lfns = condDict.pop("LFN", None) - if lfns: - if isinstance(lfns, str): - lfns = [lfns] - res = self.__getFileIDsForLfns(lfns, connection=connection) - if not res["OK"]: - return res - originalFileIDs = res["Value"][0] - condDict["FileID"] = list(originalFileIDs) - for val in condDict.values(): - if not val: - return S_OK([]) + all_columns = ["LFN"] + self.TRANSFILEPARAMS + if columns is None: + columns = all_columns + elif not set(columns).issubset(all_columns): + return S_ERROR(f"Invalid columns requested, valid columns are: {all_columns}") + + req = ", ".join(f"df.{x}" if x == "LFN" else f"tf.{x}" for x in columns) + req = f"SELECT {req} FROM TransformationFiles tf" + if "LFN" in columns or (condDict and "LFN" in condDict): + req = f"{req} JOIN DataFiles df ON tf.FileID = df.FileID" + + fixedCondDict = {} + if condDict: + for key, value in condDict.items(): + if key in self.TRANSFILEPARAMS: + fixedCondDict[f"tf.{key}"] = value + elif key in ["LFN"]: + fixedCondDict[f"df.{key}"] = value + else: + return S_ERROR(f"Invalid key {key} in condDict") + if timeStamp: + timeStamp = f"tf.{timeStamp}" + if fixedCondDict or older or newer: + cond = self.buildCondition(fixedCondDict, older, newer, timeStamp, orderAttribute, limit, offset=offset) + # When buildCondition tries to quote the column names, it will fail due to the table alias + # So we need to move the single quotes to the right place + req += f" {cond.replace('`tf.', 'tf.`').replace('`df.', 'df.`')}" - req = "{} {}".format( - req, - self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset), - ) res = self._query(req, conn=connection) if not res["OK"]: return res - transFiles = res["Value"] - fileIDs = [int(row[1]) for row in transFiles] - webList = [] - resultList = [] - if not fileIDs: - originalFileIDs = {} - else: - if not originalFileIDs: - res = self.__getLfnsForFileIDs(fileIDs, connection=connection) - if not res["OK"]: - return res - originalFileIDs = res["Value"][1] - for row in transFiles: - lfn = originalFileIDs[row[1]] - # Prepare the structure for the web - fDict = {"LFN": lfn} - fDict.update(dict(zip(self.TRANSFILEPARAMS, row))) - # Note: the line below is returning "None" if the item is None... This seems to work but is ugly... - rList = [lfn] + [str(item) if not isinstance(item, int) else item for item in row] - webList.append(rList) - resultList.append(fDict) + resultList = [dict(zip(columns, row)) for row in res["Value"]] + webList = [[str(item) if not isinstance(item, int) else item for item in row] for row in res["Value"]] + result = S_OK(resultList) result["Records"] = webList - result["ParameterNames"] = ["LFN"] + self.TRANSFILEPARAMS + result["ParameterNames"] = columns return result def getFileSummary(self, lfns, connection=False): diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index 209400aa2fe..d566dde6e65 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -289,6 +289,7 @@ def export_getTransformationFiles( orderAttribute=None, limit=None, offset=None, + columns=None, ): if not condDict: condDict = {} @@ -301,6 +302,7 @@ def export_getTransformationFiles( limit=limit, offset=offset, connection=False, + columns=columns, ) #################################################################### diff --git a/tests/Integration/TransformationSystem/Test_Client_Transformation.py b/tests/Integration/TransformationSystem/Test_Client_Transformation.py index 3cf1287253c..710ce4ff061 100644 --- a/tests/Integration/TransformationSystem/Test_Client_Transformation.py +++ b/tests/Integration/TransformationSystem/Test_Client_Transformation.py @@ -114,6 +114,17 @@ def test_addTasksAndFiles(self): for f in res["Value"]: self.assertEqual(f["Status"], TransformationFilesStatus.ASSIGNED) + # make sure we can selectively select LFNs + res = self.transClient.getTransformationFiles({"TransformationID": transID, "LFN": ["/aa/lfn.1.txt"]}) + assert res["OK"], res + assert len(res["Value"]) == 1, res + assert "TargetSE" in res["Value"][0].keys(), res + + # make sure we can selectively select columns + res = self.transClient.getTransformationFiles({"TransformationID": transID}, columns=["LFN", "Status"]) + assert res["OK"], res + assert sorted(res["Value"][0]) == ["LFN", "Status"], res + # now adding a new Transformation with new tasks, and introducing a mix of insertion, # to test that the trigger works as it should res = self.transClient.addTransformation(