Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate multiprocessing into fftvis #29

Open
wants to merge 61 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
f1704a1
set structure of functions for simulating time inds
tyler-a-cox Oct 7, 2024
a6ea190
feat: continuing to implement multiprocessing...
steven-murray Oct 7, 2024
abc7890
feat: more accurate source coordinate transforms
steven-murray Oct 7, 2024
5a80fee
consolidate tests
tyler-a-cox Oct 7, 2024
5f74f36
ensure ra/dec have the correct dtypes
tyler-a-cox Oct 7, 2024
8c69cfa
use correct matvis version
tyler-a-cox Oct 7, 2024
2aea32d
test increasing number of processes
tyler-a-cox Oct 7, 2024
f761fad
fix nsim_sources
tyler-a-cox Oct 7, 2024
4c5c86f
feat: use ray
steven-murray Oct 8, 2024
de1875d
add ray to installation
tyler-a-cox Oct 8, 2024
27893c5
fix tests
tyler-a-cox Oct 8, 2024
1f3a0c7
fix precision issue in coordinate rotation
tyler-a-cox Oct 8, 2024
eebe4a1
update fftvis to use the most recent version of matvis
tyler-a-cox Oct 8, 2024
8457b34
fix testing imports
tyler-a-cox Oct 8, 2024
6ab3e29
allow user to choose between coordinate transforms
tyler-a-cox Oct 8, 2024
16fbd93
Use threadpoolctl to control underlying threading
steven-murray Oct 9, 2024
f96888c
test: pass source buffer on in matvis tests
steven-murray Oct 9, 2024
87f8a48
feat: new --hera and --nside args to profile
steven-murray Oct 9, 2024
73bd2da
ux: print main loop time
steven-murray Oct 9, 2024
7048880
condense einsum and reduce memory usage
tyler-a-cox Oct 9, 2024
4ebe174
fix transpose issue and overwrite beam with beam-sky product
tyler-a-cox Oct 9, 2024
31ea623
perf: remove unnecessary copies of A_s
steven-murray Oct 9, 2024
36d5e20
Merge branch 'mp_fftvis' of github.com:tyler-a-cox/fftvis into mp_fftvis
steven-murray Oct 9, 2024
aacf1ed
perf: new memory tracking for Ray task.
steven-murray Oct 10, 2024
e5791ce
perf: make rotation matrix contiguous
steven-murray Oct 10, 2024
155ff2e
Change default location of memray file
steven-murray Oct 10, 2024
4ffaa41
fix log statement
steven-murray Oct 10, 2024
8320a71
perf: limit object store memory because we don't need all of it.
steven-murray Oct 10, 2024
584f70d
lower shm store
steven-murray Oct 10, 2024
dd4adb3
fix error passing times from cli
steven-murray Oct 10, 2024
f544ca1
allow finufft multithreading w/ ray
tyler-a-cox Oct 11, 2024
0b18d5e
allow multi-threaded numpy when nprocesses < ncpus
steven-murray Oct 11, 2024
7adae83
Default trace_mem to False
steven-murray Oct 11, 2024
d98004e
cli: add spline order option
steven-murray Oct 11, 2024
3c91c68
refactor: clearer call to the task function
steven-murray Oct 22, 2024
314d0c7
temporarily add some debugging stuff
steven-murray Oct 22, 2024
646a8dd
fix: better estimate of required shm
steven-murray Oct 22, 2024
a6ba81f
fix: better estimate of required shm
steven-murray Oct 22, 2024
b43c5df
fix: don't always write out ray memory trackers
steven-murray Oct 23, 2024
182fbac
temporary memory debugging
steven-murray Oct 23, 2024
6bcfaf7
perf: better ray environments for tracking memory and shm
steven-murray Oct 23, 2024
d766f02
perf: better ray environments for tracking memory and shm
steven-murray Oct 23, 2024
2faab76
feat: add ability to set freq range for profiling
steven-murray Oct 23, 2024
69fe36d
perf: by default disable memory monitoring
steven-murray Oct 23, 2024
b24b815
maint: update to use BeamInterface API
steven-murray Nov 8, 2024
b019364
Merge pull request #30 from tyler-a-cox/beam-interface-update
tyler-a-cox Nov 11, 2024
9ec982c
add numba to deps
tyler-a-cox Nov 11, 2024
dadf91e
update memray to deps
tyler-a-cox Nov 19, 2024
677e107
maint: update matvis dep
steven-murray Nov 22, 2024
8f414b9
Merge branch 'beam-interface-update' into mp_fftvis
steven-murray Nov 22, 2024
906a333
test: fix test calling BeamInterface
steven-murray Nov 25, 2024
97c887c
fix: don't try printing shared mem on macos
steven-murray Nov 25, 2024
d09f68a
test: don't run multiprocessing tests on macos
steven-murray Nov 25, 2024
195de00
omit cli.py from coverage
tyler-a-cox Nov 27, 2024
4d85e24
compile functions not covered in codecov; omit
tyler-a-cox Nov 27, 2024
7dc4e6f
Exclude other compiled function
tyler-a-cox Nov 27, 2024
f70a038
add test coverage to utils.py
tyler-a-cox Nov 27, 2024
f815557
remove usage of hera_sim in tests
tyler-a-cox Nov 27, 2024
b50c9f2
exclude memory tracing lines from coverage
tyler-a-cox Nov 27, 2024
2785992
exclude compiled functions
tyler-a-cox Nov 27, 2024
3fd42d2
test without defining baselines
tyler-a-cox Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
[run]
include = fftvis/*
omit = fftvis/logutils.py
omit =
fftvis/logutils.py
fftvis/cli.py
branch = True

[report]
omit = */tests/*
omit = */tests/*
exclude_lines =
if trace_mem:
pragma: no cover
if isinstance(beam, UVBeam):
if _nbig != nprocesses:
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,6 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.vscode
.vscode
*.flame
cprofile*
67 changes: 44 additions & 23 deletions fftvis/beams.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import numpy as np
from pyuvdata import UVBeam

from pyuvdata.beam_interface import BeamInterface
import numba as nb

def _evaluate_beam(
A_s: np.ndarray,
beam: UVBeam,
beam: BeamInterface,
az: np.ndarray,
za: np.ndarray,
polarized: bool,
Expand Down Expand Up @@ -44,39 +43,61 @@ def _evaluate_beam(
for interpolation orders greater than linear.
"""
# Primary beam pattern using direct interpolation of UVBeam object
kw = (
{
"reuse_spline": True,
"check_azza_domain": False,
"spline_opts": spline_opts,
"interpolation_function": interpolation_function,
}
if isinstance(beam, UVBeam)
else {}
)
if isinstance(beam, UVBeam) and not beam.future_array_shapes:
beam.use_future_array_shapes()
kw = {
"reuse_spline": True,
"check_azza_domain": False,
"spline_opts": spline_opts,
"interpolation_function": interpolation_function,
}

interp_beam = beam.interp(
interp_beam = beam.compute_response(
az_array=az,
za_array=za,
freq_array=np.atleast_1d(freq),
**kw,
)[0]
)

if polarized:
interp_beam = interp_beam[:, :, 0, :]
else:
# Here we have already asserted that the beam is a power beam and
# has only one polarization, so we just evaluate that one.
interp_beam = np.sqrt(interp_beam[0, 0, 0, :])

A_s[:, :] = interp_beam
interp_beam = interp_beam[0, 0, 0, :]

# Check for invalid beam values
if check:
sm = np.sum(A_s)
sm = np.sum(interp_beam)
if np.isinf(sm) or np.isnan(sm):
raise ValueError("Beam interpolation resulted in an invalid value")

return A_s
return interp_beam

@nb.jit(
nopython=True,
parallel=False,
nogil=False
)
def get_apparent_flux_polarized(beam: np.ndarray, flux: np.ndarray): # pragma: no cover
"""Calculate apparent flux of the sources."""
nax, nfd, nsrc = beam.shape

for isrc in range(nsrc):
c = np.conj(beam[:, :, isrc])

i00 = (
c[0, 0] * beam[0, 0, isrc] +
c[1, 0] * beam[1, 0, isrc]
)
i01 = (
c[0, 0] * beam[0, 1, isrc] +
c[1, 0] * beam[1, 1, isrc]
)

i11 = (
c[0, 1] * beam[0, 1, isrc] +
c[1, 1] * beam[1, 1, isrc]
)
beam[0, 0, isrc] = i00 * flux[isrc]
beam[0, 1, isrc] = i01 * flux[isrc]
beam[1, 0, isrc] = np.conj(i01) * flux[isrc]
beam[1, 1, isrc] = i11 * flux[isrc]
147 changes: 147 additions & 0 deletions fftvis/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import typer
from matvis.cli import get_standard_sim_params, get_line_based_stats, get_summary_stats, get_label, get_redundancies
from .simulate import simulate_vis, _evaluate_vis_chunk
from pathlib import Path
from typing import Optional
import logging
from rich.console import Console
from rich.rule import Rule
from line_profiler import LineProfiler
import time
from pyuvdata.telescopes import get_telescope
import cProfile
import pstats
import os
import numpy as np
from hera_sim.antpos import hex_array

cns = Console()

logger = logging.getLogger('fftvis')
profiler = LineProfiler()

app = typer.Typer()

@app.command()
def run_profile(
analytic_beam: bool = True,
nfreq: int = 1,
ntimes: int = 1,
nants: int = 1,
nsource: int = 1,
double_precision: bool = True,
outdir: Path = Path(".").absolute(),
verbose: bool = True,
log_level: str = "INFO",
coord_method: str = "CoordinateRotationERFA",
naz: int = 360,
nza: int = 180,
nprocesses: int = 1,
update_bcrs_every: float = np.inf,
hera: int = 0,
nside: int = 0,
force_use_ray: bool = False,
trace_mem: bool = False,
beam_spline_order: int = 3,
freq_min: float = 100, # MHz
):
"""Run the script."""
logger.setLevel(log_level.upper())

if nside > 0:
nsource = 12*nside**2

(
ants,
flux,
ra,
dec,
freqs,
times,
cpu_beams,
beam_idx,
) = get_standard_sim_params(
analytic_beam, nfreq, ntimes, nants, nsource, nbeams=1, naz=naz, nza=nza,
freq_min=freq_min*1e6,
)
if hera > 0:
ants = hex_array(hera)

ants = {k: list(v) for k, v in ants.items()}
nants = len(ants)

cns.print(Rule("Running fftvis profile"))
cns.print(f" NANTS: {nants:>7}")
cns.print(f" NTIMES: {ntimes:>7}")
cns.print(f" NFREQ: {nfreq:>7}")
cns.print(f" NSOURCE: {nsource:>7}")
cns.print(f" DOUBLE-PRECISION: {double_precision:>7}")
cns.print(f" ANALYTIC-BEAM: {analytic_beam:>7}")
cns.print(f" COORDROT METHOD: {coord_method:>7}")
#cns.print(f" NPAIRS: {len(pairs) if pairs is not None else nants**2:>7}")
cns.print(f" NAZ: {naz:>7}")
cns.print(f" NZA: {nza:>7}")
cns.print(f" NPROCESSES: {nprocesses:>7}")
cns.print(f" INTERP ORDER: {beam_spline_order:>7}")

if coord_method == "CoordinateRotationERFA":
cns.print(f" BCRS UPDATE: {update_bcrs_every:>7}")
coord_params = {"update_bcrs_every": update_bcrs_every}
else:
coord_params = {}

cns.print(Rule())

profiler.add_function(simulate_vis)
profiler.add_function(_evaluate_vis_chunk)

init_time = time.time()

str_id = get_label(
analytic_beam=analytic_beam,
nfreq=nfreq,
ntimes=ntimes,
nants=nants,
nsource=nsource,
nbeams=1,
matprod_method='',
gpu=False,
double_precision=double_precision,
coord_method=coord_method,
naz=naz,
nza=nza,
)

cProfile.runctx(
"""simulate_vis(
ants=ants,
fluxes=flux,
ra=ra,
dec=dec,
freqs=freqs,
times=times.jd,
beam=cpu_beams[0],
polarized=True,
precision=2 if double_precision else 1,
telescope_loc=get_telescope("hera").location,
nprocesses=nprocesses,
coord_method_params=coord_params,
force_use_ray=force_use_ray,
trace_mem=trace_mem,
beam_spline_opts={'order': beam_spline_order},
)""",
globals(),
locals(),
str_id
)

out_time = time.time()
cns.print("TOTAL TIME: ", out_time - init_time)

p = pstats.Stats(str_id)
p.sort_stats("cumulative").print_stats(50)
os.system(f"flameprof --format=log {str_id} > {str_id}.flame")


if __name__=="__main__":
app()
30 changes: 25 additions & 5 deletions fftvis/logutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import time
import tracemalloc as tm
import psutil


logger = logging.getLogger(__name__)
Expand All @@ -26,13 +27,31 @@ def human_readable_size(size, decimal_places=2, indicate_sign=False):
else:
return f"{size:.{decimal_places}f} {unit}"


def printmem(pr: psutil.Process, msg: str=""):
"""Print memory usage of the process."""
#if logger.isEnabledFor(logging.INFO):
info = pr.memory_info()

rss = info.rss
try:
shm = info.shared
except AttributeError:
# On macos there is no shared memory
shm = 0
used = rss - shm

shm = human_readable_size(shm)
used = human_readable_size(used)

#logger.info(f"{msg} Memory Usage [{pr.pid}]: {used} internal, {shm} shared.")
print(f"{msg} Memory Usage [{pr.pid}]: {used} internal, {shm} shared.")

def memtrace(highest_peak) -> int:
if logger.isEnabledFor(logging.INFO):
cm, pm = tm.get_traced_memory()
logger.info(f"Starting Memory usage : {cm / 1024**3:.3f} GB")
logger.info(f"Starting Peak Mem usage: {pm / 1024**3:.3f} GB")
logger.info(f"Traemalloc Peak Memory (tot)(GB): {highest_peak / 1024**3:.2f}")
logger.info(f"Tracemalloc Peak Memory (tot)(GB): {highest_peak / 1024**3:.2f}")
tm.reset_peak()
return max(pm, highest_peak)

Expand All @@ -48,9 +67,10 @@ def log_progress(start_time, prev_time, iters, niters, pr, last_mem):
per_iter = total / iters
expected = per_iter * niters

rss = pr.memory_info().rss
mem = human_readable_size(rss)
memdiff = human_readable_size(rss - last_mem, indicate_sign=True)
info = pr.memory_info()
used = info.rss - info.shared
mem = human_readable_size(used)
memdiff = human_readable_size(used - last_mem, indicate_sign=True)

logger.info(
f"""
Expand Down
Loading
Loading