Skip to content

Commit

Permalink
updated reader.py for use of XRootD.glob or glob if fails
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander May committed Mar 16, 2023
1 parent e51843c commit e781fea
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
52 changes: 37 additions & 15 deletions src/pythium/sklimming/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,46 @@
import numpy as np
#============== System and Python Imports
from typing import List, Dict, Tuple
import os
import os, glob
import re
import time
import psutil
import gc
import XRootD.client as client
import XRootD.client.glob_funcs as glob
#============ thbbanalysis Imports
from pythium.common.branches import *
from pythium.common.tools import combine_list_of_dicts
#import thbbanalysis.sklimming as sklim
from pythium.sklimming import writer
from pythium.common import tools


try:
import XRootD.client as client
import XRootD.client.glob_funcs as remote_glob
remote_glob_imported = True
except ImportError:
remote_glob_imported = False

CfgType = Dict[str, Dict[str, Union[str,bool,int,float]]]
SampleDataType = Dict[str, List[Union[str,float,int]]]
CutFuncType = Callable[..., "ak.Array"]
CutFuncArgsType = Dict[str,List[Union[str,float,int]]]
BranchStatusType = Dict[str,Dict[str,List[Union["Branch",str]]]]

def get_file_size(file)->float:
with client.File() as f:
f.open(file)
info=f.stat()
return info[1].size*1e-9
def get_file_size(file: str)->float:
'''
Find the size of a file in GB from its filename
Args:
file: A string containing a filename
Return:
A float of the size of the file in GB
'''
if remote_glob_imported:
with client.File() as f:
f.open(file)
info=f.stat()
return info[1].size*1e-9
else:
return os.path.getsize(file)*1e-9

def decorate_sample_tag(tags:List[str])->List[str]:
'''
Expand All @@ -46,7 +59,7 @@ def decorate_sample_tag(tags:List[str])->List[str]:
Return:
a list with decorated elements
'''
return [f'{tag}*' for tag in tags]
return [f'*{tag}*' for tag in tags]


def make_sample_path(locations: List[Path], tags:List[str]) -> List[str]:
Expand All @@ -58,10 +71,14 @@ def make_sample_path(locations: List[Path], tags:List[str]) -> List[str]:
Return:
paths: A list of full paths up to sample tags
'''
paths = [((str(loc)+'/') if str(loc) != '.' else '') +tag for tag in tags for loc in locations]
#paths = [str(loc)+'/'+tag for tag in tags for loc in locations/pytths = [str(loc)+'/'+tag for tag in tags for loc in locations]
dirpaths = [p+'/.*' for path in paths for p in glob.glob(path) if os.path.isdir(p)]
fpaths = list(set([path for path in paths for p in glob.glob(path) if not os.path.isdir(p) ]))
if remote_glob_imported:
paths = [((str(loc)+'/') if str(loc) != '.' else '') +tag for tag in tags for loc in locations]
dirpaths = [p+'/.*' for path in paths for p in remote_glob.glob(path) if os.path.isdir(p)]
fpaths = list(set([path for path in paths for p in remote_glob.glob(path) if not os.path.isdir(p) ]))
else:
paths = [str(loc)+'/'+tag for tag in tags for loc in locations]
dirpaths = [p+'/.*' for path in paths for p in glob.glob(path) if os.path.isdir(p)]
fpaths = list(set([path for path in paths for p in glob.glob(path) if not os.path.isdir(p) ]))
paths = dirpaths+fpaths

return paths
Expand Down Expand Up @@ -139,7 +156,12 @@ def run_workflow(paths: List[str], trees_to_branch_names: Dict[str, str], sample
out_idx = 0
for path in paths:
logger.info(f'Processing data from the following path: \n {path}')
files = glob.glob(path) # Get list of files matching path regex

if remote_glob_imported:
files = remote_glob.glob(path) # Get list of files matching path regex
else:
files = glob.glob(path)

if len(files) == 0:
logger.warning(f"No files were found in {path}, skipping")
continue
Expand Down
3 changes: 1 addition & 2 deletions src/pythium/sklimming/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ def write_sample(sample_data, sample, cfg, ext='H5', suffix=''):
elif ext == 'parquet':
for tree, data in sample_data.items():
outfile += f"__{tree}.parquet"
for key in ak.fields(data): #remove events with missing data
data = data[~ak.is_none(data[key])]
data = data[~ak.is_none(data)]
ak.to_parquet(data, outfile)
outfile = outfile.replace(f"__{tree}.parquet", "")
elif ext == "json":
Expand Down

0 comments on commit e781fea

Please sign in to comment.