Skip to content

Commit

Permalink
Merge pull request #28 from Moelf/robust_pmap
Browse files Browse the repository at this point in the history
Robust pmap
  • Loading branch information
gojakuch authored Oct 10, 2023
2 parents a5dc7f2 + 506b484 commit d0c9cec
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 42 deletions.
26 changes: 19 additions & 7 deletions Manifest.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# This file is machine-generated - editing it directly is not advised

julia_version = "1.9.0"
julia_version = "1.9.2"
manifest_format = "2.0"
project_hash = "c5ae1c70a281d27e3b1000f705b30462c3a49c1b"
project_hash = "9f74d7363441a1def24bda56204ea1a9c50c6c57"

[[deps.AbstractFFTs]]
deps = ["LinearAlgebra"]
Expand Down Expand Up @@ -225,7 +225,7 @@ weakdeps = ["Dates", "LinearAlgebra"]
[[deps.CompilerSupportLibraries_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "e66e0078-7015-5450-92f7-15fbd957f2ae"
version = "1.0.2+0"
version = "1.0.5+0"

[[deps.ConcurrentUtilities]]
deps = ["Serialization", "Sockets"]
Expand Down Expand Up @@ -874,6 +874,12 @@ git-tree-sha1 = "51d946d38d62709d6a2d37ea9bcc30c80c686801"
uuid = "eff96d63-e80a-5855-80a2-b1b0885c5ab7"
version = "2.9.0"

[[deps.Memento]]
deps = ["Dates", "Distributed", "Requires", "Serialization", "Sockets", "Test", "UUIDs"]
git-tree-sha1 = "bb2e8f4d9f400f6e90d57b34860f6abdc51398e5"
uuid = "f28f55f0-a522-5efc-85c2-fe41dfb9b2d9"
version = "1.4.1"

[[deps.Memoization]]
deps = ["MacroTools"]
git-tree-sha1 = "2f6913923a0cb8046134f5cbf8b4d7ba3c856a1d"
Expand Down Expand Up @@ -1036,6 +1042,12 @@ git-tree-sha1 = "84a314e3926ba9ec66ac097e3635e270986b0f10"
uuid = "36c8627f-9965-5494-a995-c6b170f724f3"
version = "1.50.9+0"

[[deps.Parallelism]]
deps = ["Distributed", "LinearAlgebra", "Memento"]
git-tree-sha1 = "0c82d46ee150b88821bfec7ee148fe6657d9ec29"
uuid = "c8c83da1-e5f9-4e2c-a857-b8617bac3554"
version = "0.1.3"

[[deps.Parameters]]
deps = ["OrderedCollections", "UnPack"]
git-tree-sha1 = "34c0e9ad262e5f7fc75b10a9952ca7692cfc5fbe"
Expand All @@ -1057,7 +1069,7 @@ version = "0.40.1+0"
[[deps.Pkg]]
deps = ["Artifacts", "Dates", "Downloads", "FileWatching", "LibGit2", "Libdl", "Logging", "Markdown", "Printf", "REPL", "Random", "SHA", "Serialization", "TOML", "Tar", "UUIDs", "p7zip_jll"]
uuid = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
version = "1.9.0"
version = "1.9.2"

[[deps.PkgVersion]]
deps = ["Pkg"]
Expand Down Expand Up @@ -1121,9 +1133,9 @@ uuid = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"

[[deps.ProgressMeter]]
deps = ["Distributed", "Printf"]
git-tree-sha1 = "d7a7aef8f8f2d537104f170139553b14dfe39fe9"
git-tree-sha1 = "ae36206463b2395804f2787ffe172f44452b538d"
uuid = "92933f4c-e287-5a05-a399-4b506db050ca"
version = "1.7.2"
version = "1.8.0"

[[deps.QOI]]
deps = ["ColorTypes", "FileIO", "FixedPointNumbers"]
Expand Down Expand Up @@ -1590,7 +1602,7 @@ version = "0.15.1+0"
[[deps.libblastrampoline_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "8e850b90-86db-534c-a0d3-1478176c7d93"
version = "5.7.0+0"
version = "5.8.0+0"

[[deps.libdeflate_jll]]
deps = ["Artifacts", "JLLWrappers", "Libdl"]
Expand Down
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
FHist = "68837c9b-b678-4cd5-9925-8a54edc8f695"
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
LorentzVectorHEP = "f612022c-142a-473f-8cfd-a09cf3793c6c"
Parallelism = "c8c83da1-e5f9-4e2c-a857-b8617bac3554"
Pluto = "c3e4b0f8-55cb-11ea-2926-15256bba5781"
PlutoUI = "7f904dfe-b85e-4ff6-b463-dae2292396a8"
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
Revise = "295af30f-e4ad-537b-8983-00126c2a3abe"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
UnROOT = "3cd96dde-e98d-4713-81e9-a4a1b0235ce9"
Expand Down
26 changes: 18 additions & 8 deletions src/LHC_AGC.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,32 @@ using UnROOT, FHist, LorentzVectorHEP, JSON3
using LorentzVectorHEP: fromPxPyPzM
using Combinatorics: Combinations
using Distributions
using ProgressMeter, Parallelism

include("constants.jl")
include("syst_utils.jl")
include("main_loop.jl")
include("visuals.jl")

function nevts_total(process_tag, variation=:nominal)
NJSON[process_tag][variation][:nevts_total]
"""
nevts_total(process_tag, n_files_max_per_sample, variation=:nominal)
calculates the number of events in the processed files given the process tag and the maximal number of files per sample.
"""
function nevts_total(process_tag, n_files_max_per_sample, variation=:nominal)
mapreduce(+, first(NJSON[process_tag][variation][:files], n_files_max_per_sample)) do arg
arg["nevts"]
end
end

"""
Convert xrd path from JSON to local path
xrd_to_local(url)
Convert xrd path from JSON to local path
"""
function xrd_to_local(url)
joinpath(BASE_PATH[], last(split(url, '/')))
# joinpath(BASE_PATH[], last(split(url, '/')))
joinpath(BASE_PATH[], last(split(url, '/'), 2)...)
end

const TAG_PATH_DICT =
Expand All @@ -29,7 +40,7 @@ const TAG_PATH_DICT =
)

"""
download_data(N; process_tags=[:ttbar])
download_data(N = MAX_N_FILES_PER_SAMPLE[]; process_tags = [:ttbar], variation_tags = [:nominal])
Download `N` files for each of the process tags.
"""
Expand All @@ -53,7 +64,7 @@ function download_data(N = MAX_N_FILES_PER_SAMPLE[]; process_tags = [:ttbar], va
end

"""
generate_workspace_file(all_hists::Dict, filename, real_data; rebin2=true, systematics=false)::Dict
generate_workspace_file(all_hists::DictT, filename, real_data; rebin2=true, systematics=false)::Dict where DictT
Generates a workspace dictionary, writes it to a JSON file and returns
Expand Down Expand Up @@ -97,7 +108,6 @@ function generate_workspace_file(all_hists::DictT, filename, real_data; rebin2=t
for evt_type in keys(all_hists)
norm_factors[evt_type] = Dict{Symbol, Float64}()
for x in keys(region_names)
# symmetric reconstruction of the _down
for (var, var_down) in (values(modifiers_dict[evt_type]))
norm = norm_factors[evt_type][Symbol(x, var)] = integral(all_hists[evt_type][Symbol(x, var)])/integral(all_hists[evt_type][Symbol(x, :_nominal)])
all_hists[evt_type][Symbol(x, var)] = all_hists[evt_type][Symbol(x, var)]*(1. / norm) # normalise the up variation
Expand All @@ -109,7 +119,7 @@ function generate_workspace_file(all_hists::DictT, filename, real_data; rebin2=t
end
end
end
# reconstruct these separately for ttbar only
# reconstruct these (symmetrically) separately for ttbar only
for x in keys(region_names)
for (var, var_down) in [(:_ME_var, :_ME_var_sym), (:_PS_var, :_PS_var_sym)]
all_hists[:ttbar][Symbol(x, var_down)] = all_hists[:ttbar][Symbol(x, :_nominal)]*2 - all_hists[:ttbar][Symbol(x, var)]
Expand Down
3 changes: 2 additions & 1 deletion src/constants.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const xsec_info = Dict(
const NJSON = joinpath(dirname(@__DIR__), "ntuples_nanoaod.json") |> read |> JSON3.read;
const TAGS = keys(NJSON)

const BASE_PATH = Ref(joinpath(dirname(@__DIR__), "data"))
#const BASE_PATH = Ref(joinpath(dirname(@__DIR__), "data"))
const BASE_PATH = Ref("/data/alheld/AGC/datasets/nanoAOD/")
const MAX_N_FILES_PER_SAMPLE = Ref(2)
const LUMI = 3378.0 # /pb
65 changes: 42 additions & 23 deletions src/main_loop.jl
Original file line number Diff line number Diff line change
@@ -1,45 +1,64 @@
"""
get_all_hists(; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[], tags = LHC_AGC.TAGS, histo_getter=get_histo)
Produces the `all_hists` dictionary that may be required for building `workspace.json` and plotting.
`histo_getter` should either be `get_histo` or `get_histo_distributed`.
"""
function get_all_hists(; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[], tags = LHC_AGC.TAGS, histo_getter=get_histo)
Dict(tag => histo_getter(tag; do_file_variations, wgt, n_files_max_per_sample) for tag in tags)
end

"""
get_histo(process_tag::Symbol; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[])
"""
function get_histo(process_tag::Symbol; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[])
N = n_files_max_per_sample
if iszero(wgt)
wgt = LUMI * xsec_info[process_tag] / nevts_total(process_tag)
end

file_variation_tags = (do_file_variations ? keys(TAG_PATH_DICT[process_tag]) : [:nominal])

all_hists = reduce(merge, [
mapreduce(mergewith(+), @view TAG_PATH_DICT[process_tag][variation_tag][begin:N]) do path
mapreduce(mergewith(+), first(TAG_PATH_DICT[process_tag][variation_tag], N)) do path
get_histo(LazyTree(path, "Events"), wgt, file_variation=variation_tag)
end for variation_tag in file_variation_tags
])
all_hists
end

"""
get_histo_distributed(process_tag::Symbol; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[])
"""
function get_histo_distributed(process_tag::Symbol; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[])
Base.@kwdef struct AnalysisTask
proc_tag::Symbol
path::String
wgt::Float64
variation_tag::Symbol
end

function get_tasks(proc_tag::Symbol; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[])
N = n_files_max_per_sample
if iszero(wgt)
wgt = LUMI * xsec_info[process_tag] / nevts_total(process_tag)
end

file_variation_tags = (do_file_variations ? keys(TAG_PATH_DICT[process_tag]) : [:nominal])
file_variation_tags = (do_file_variations ? keys(TAG_PATH_DICT[proc_tag]) : [:nominal])

files = Tuple{String, Symbol}[]
tasks = AnalysisTask[]
for variation_tag in file_variation_tags
append!(files, Tuple{String, Symbol}[(path, variation_tag) for path in @view TAG_PATH_DICT[process_tag][variation_tag][begin:N]])
_wgt = iszero(wgt) ? (LUMI * xsec_info[proc_tag] / nevts_total(proc_tag, N, variation_tag)) : wgt
append!(tasks, [AnalysisTask(; proc_tag, path, wgt = _wgt, variation_tag) for path in first(TAG_PATH_DICT[proc_tag][variation_tag],N)])
end
return tasks
end

mainloop = function (tuple)
path, variation_tag = tuple
get_histo(LazyTree(path, "Events"), wgt,file_variation=variation_tag)
function get_histo(task::AnalysisTask)
get_histo(LazyTree(task.path, "Events"), task.wgt; file_variation = task.variation_tag)
end

"""
get_histo_distributed(process_tags::Vector{Symbol}; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[])
"""
function get_histo_distributed(process_tags::Vector{Symbol}; do_file_variations::Bool=true, wgt = 0.0, n_files_max_per_sample = MAX_N_FILES_PER_SAMPLE[])
all_tasks = mapreduce(p-> get_tasks(p; do_file_variations, wgt, n_files_max_per_sample), vcat, process_tags)
dicts = progress_map(all_tasks; mapfun=robust_pmap) do task
return Dict(task.proc_tag => get_histo(task))
end
dicts = pmap(mainloop, files)

return reduce(mergewith(+), dicts)
return reduce(mergewith(mergewith(+)), dicts)
end

function generate_hists(file_variation::Symbol)
Expand All @@ -65,6 +84,7 @@ function generate_hists(file_variation::Symbol)
return hists
end

## THIS IS ACTUALLY THE MAIN LOOP, WHICH GETS CALLED FROM EVERY VERSION OF get_histo
"""
get_histo(tree, wgt; file_variation::Symbol=:nominal, evts=nothing)
Expand All @@ -73,7 +93,8 @@ end
function get_histo(tree, wgt; file_variation::Symbol=:nominal, evts=nothing)
is_nominal_file = (:nominal == file_variation)
hists = generate_hists(file_variation)
Threads.@threads for evt in tree
#Threads.@threads for evt in tree
for evt in tree
# single lepton requirement
(; Electron_pt, Muon_pt) = evt
(count(>(25), Electron_pt) + count(>(25), Muon_pt) != 1) && continue
Expand Down Expand Up @@ -106,10 +127,8 @@ function get_histo(tree, wgt; file_variation::Symbol=:nominal, evts=nothing)

# construct jet lorentz vector
jet_p4 = @views LorentzVectorCyl.(Jet_pt[jet_pt_mask], Jet_eta[jet_pt_mask], Jet_phi[jet_pt_mask], Jet_mass[jet_pt_mask])
#jet_p4 = @views fromPxPyPzM.(Jet_px[jet_pt_mask], Jet_py[jet_pt_mask], Jet_pz[jet_pt_mask], Jet_mass[jet_pt_mask])

Njets = length(jet_btag)
# Njets == length(jet_p4) || error("impossible reached")
Njets = length(jet_btag)

# tri jet combinatorics
max_pt = -Inf
Expand Down
3 changes: 2 additions & 1 deletion src/visuals.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ using CairoMakie
Creates a plot for the stack of histograms in the 4j2b region for the mbjj variable.
`all_hists` should be a dictionary of the form evt_type => hists_dict, where hists_dict is what get_histo(evt_type, ...) would return.
`all_hists` should be a dictionary that `LHC_AGC.get_all_hists` would return or of the equivalent form: `evt_type => hists_dict`, where hists_dict is what get_histo(evt_type, ...) would return.
if `evt_types` is `nothing` `keys(all_hists)` are used instead.
`color` should be a vector of colors of a meaningful size.
Expand Down
4 changes: 2 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ end
"""
A function for test comparison
"""
function AGC_quicktest(filepath, _bincounts, wgt, hist_types_to_match; eps=0.01)
function AGC_quicktest(filepath, _bincounts, wgt, hist_types_to_match; eps=0.01) # this one runs the get_histo and compares the processed events by id as well as the bincounts
tt_tree = LazyTree(filepath, "Events")
evts = Dict(k => Int[] for k in hist_types_to_match)
res = LHC_AGC.get_histo(tt_tree, wgt; evts=evts)
Expand All @@ -33,7 +33,7 @@ function AGC_quicktest(filepath, _bincounts, wgt, hist_types_to_match; eps=0.01)

res
end
function AGC_quicktest(res, _bincounts; eps=0.01)
function AGC_quicktest(res, _bincounts; eps=0.01) # this one doesn't run the get_histo and expects the result to be already given to it and only compares the bincounts
for k in keys(_bincounts)
# test bincounts
@test (maximum(abs.(bincounts(res[k]) - _bincounts[k])) <= eps)
Expand Down

0 comments on commit d0c9cec

Please sign in to comment.