-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
273 additions
and
63 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
""" | ||
Code for interacting with MOA light curve files and metadata. | ||
""" | ||
from __future__ import annotations | ||
|
||
from collections import defaultdict | ||
from pathlib import Path | ||
from typing import ClassVar | ||
|
||
import pandas as pd | ||
import requests | ||
from bs4 import BeautifulSoup | ||
|
||
|
||
class MoaDataInterface2M: | ||
""" | ||
A class for interacting with MOA light curve files and metadata. | ||
""" | ||
|
||
survey_tag_to_path_list_dictionary_: dict[str, list[Path]] | None = None | ||
no_tag_string = "no_tag" | ||
all_survey_tags: ClassVar[list[str]] = ["c", "cf", "cp", "cw", "cs", "cb", "v", "n", "nr", "m", "j", no_tag_string] | ||
|
||
@property | ||
def survey_tag_to_path_list_dictionary(self) -> dict[str, list[Path]]: | ||
""" | ||
Property allowing the survey tag to path list dictionary to only be loaded once. | ||
:return: The survey tag to path list dictionary | ||
""" | ||
if self.survey_tag_to_path_list_dictionary_ is None: | ||
takahiro_sumi_nine_year_events_data_frame = self.read_takahiro_sumi_nine_year_events_table_as_data_frame( | ||
Path("/local/data/fugu3/sishitan/qusi_project/qusi/data/moa_microlensing_550k/candlist_2023Oct12.txt") | ||
) | ||
self.survey_tag_to_path_list_dictionary_ = self.group_paths_by_tag_in_events_data_frame( | ||
list(Path("/local/data/fugu3/sishitan/qusi_project/merida/data/microlensing_2M").glob("**/*.feather")), takahiro_sumi_nine_year_events_data_frame | ||
) | ||
return self.survey_tag_to_path_list_dictionary_ | ||
|
||
|
||
@staticmethod | ||
def read_takahiro_sumi_nine_year_events_table_as_data_frame(path: Path) -> pd.DataFrame: | ||
""" | ||
Reads Takahiro Sumi's 9-year events table as a Pandas data frame. | ||
:param path: The path to the events table file. | ||
:return: The data frame. | ||
""" | ||
named_column_names = ["field", "clr", "chip", "subfield", "id", "tag", "x", "y"] | ||
# The number of columns in the file are inconsistent, so here we add extra unnamed columns to match the | ||
# largest number of columns in any row. | ||
largest_column_count = 33 | ||
unnamed_column_names = [f"unnamed{index}" for index in range(largest_column_count - len(named_column_names))] | ||
column_names = named_column_names + unnamed_column_names | ||
data_frame = pd.read_csv( | ||
path, comment="#", names=column_names, delim_whitespace=True, skipinitialspace=True, skiprows=23 | ||
) | ||
data_frame = data_frame.set_index(["field", "clr", "chip", "subfield", "id"], drop=False) | ||
data_frame = data_frame.sort_index() | ||
return data_frame | ||
|
||
def get_tag_for_path_from_data_frame(self, path: Path, events_data_frame: pd.DataFrame) -> str: | ||
""" | ||
Gets the event tag of a light curve from the events data frame. | ||
:param path: The path of the light curve whose event tag should be retrieved. | ||
:param events_data_frame: Takahiro Sumi's 9-year events data frame. | ||
:return: The string of the tag of the event. None if no tag exists. | ||
""" | ||
file_name = path.name | ||
file_name_without_extension = file_name.split(".")[0] | ||
moa_identifier = file_name_without_extension.split("_")[-1] # Remove duplicate identifier string. | ||
field, clr, chip_string, subfield_string, id_string = moa_identifier.split("-") | ||
chip, subfield, id_ = int(chip_string), int(subfield_string), int(id_string) | ||
try: | ||
row = events_data_frame.loc[(field, clr, chip, subfield, id_)] | ||
except KeyError: | ||
return self.no_tag_string | ||
tag = row["tag"] | ||
return tag | ||
|
||
def group_paths_by_tag_in_events_data_frame( | ||
self, paths: list[Path], events_data_frame: pd.DataFrame | ||
) -> dict[str, list[Path]]: | ||
""" | ||
Groups paths into a dictionary based on their tags. | ||
:param paths: The paths to group. | ||
:param events_data_frame: The events data frame to look for a tag in. | ||
:return: | ||
""" | ||
tag_path_list_dictionary = defaultdict(list) | ||
for path in paths: | ||
tag = self.get_tag_for_path_from_data_frame(path, events_data_frame) | ||
tag_path_list_dictionary[tag].append(path) | ||
return tag_path_list_dictionary |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
from __future__ import annotations | ||
|
||
import re | ||
import shutil | ||
import socket | ||
from pathlib import Path | ||
from typing import TYPE_CHECKING | ||
|
||
import pandas as pd | ||
import scipy.stats | ||
from filelock import FileLock | ||
|
||
from moa_data_interface_2M import MoaDataInterface2M | ||
from ramjet.photometric_database.light_curve_collection import LightCurveCollection | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Iterable | ||
|
||
import numpy as np | ||
|
||
|
||
class MoaSurveyLightCurveCollection2M(LightCurveCollection): | ||
""" | ||
A collection of light curves based on the MOA 9-year survey. | ||
""" | ||
|
||
moa_data_interface = MoaDataInterface2M() | ||
|
||
def __init__( | ||
self, | ||
survey_tags: list[str], | ||
dataset_splits: list[int] | None = None, | ||
label: float | list[float] | np.ndarray | None = None, | ||
): | ||
super().__init__() | ||
self.label = label | ||
self.survey_tags: list[str] = survey_tags | ||
self.dataset_splits: list[int] | None = dataset_splits | ||
|
||
def get_paths(self) -> Iterable[Path]: | ||
""" | ||
Gets the paths for the light curves in the collection. | ||
:return: An iterable of the light curve paths. | ||
""" | ||
paths: list[Path] = [] | ||
for tag in self.survey_tags: | ||
tag_paths = self.moa_data_interface.survey_tag_to_path_list_dictionary[tag] | ||
if self.dataset_splits is not None: | ||
# Split on each tag, so that the splitting remains across collections with different tag selections. | ||
tag_paths = self.shuffle_and_split_paths(tag_paths, self.dataset_splits) | ||
paths.extend(tag_paths) | ||
return paths | ||
|
||
def move_path_to_nvme(self, path: Path) -> Path: | ||
match = re.match(r"gpu\d{3}", socket.gethostname()) | ||
if match is not None: | ||
nvme_path = Path("/lscratch/golmsche").joinpath(path) | ||
if not nvme_path.exists(): | ||
nvme_path.parent.mkdir(exist_ok=True, parents=True) | ||
nvme_lock_path = nvme_path.parent.joinpath(nvme_path.name + ".lock") | ||
lock = FileLock(str(nvme_lock_path)) | ||
with lock.acquire(): | ||
if not nvme_path.exists(): | ||
nvme_tmp_path = nvme_path.parent.joinpath(nvme_path.name + ".tmp") | ||
shutil.copy(path, nvme_tmp_path) | ||
nvme_tmp_path.rename(nvme_path) | ||
return nvme_path | ||
return path | ||
|
||
def load_times_and_fluxes_from_path(self, path: Path) -> (np.ndarray, np.ndarray): | ||
""" | ||
Loads the times and fluxes from a given light curve path. | ||
:param path: The path to the light curve file. | ||
:return: The times and the fluxes of the light curve. | ||
""" | ||
path = self.move_path_to_nvme(path) | ||
light_curve_dataframe = pd.read_feather(path) | ||
times = light_curve_dataframe["HJD"].values | ||
fluxes = light_curve_dataframe["flux"].values | ||
return times, fluxes | ||
|
||
def load_times_and_magnifications_from_path(self, path: Path) -> (np.ndarray, np.ndarray): | ||
""" | ||
Loads the times and magnifications from a given path as an injectable signal. | ||
:param path: The path to the light curve/signal file. | ||
:return: The times and the magnifications of the light curve/signal. | ||
""" | ||
path = self.move_path_to_nvme(path) | ||
times, fluxes = self.load_times_and_fluxes_from_path(path) | ||
magnifications, times = self.generate_synthetic_signal_from_real_data(fluxes, times) | ||
return times, magnifications | ||
|
||
@staticmethod | ||
def generate_synthetic_signal_from_real_data(fluxes: np.ndarray, times: np.ndarray) -> (np.ndarray, np.ndarray): | ||
""" | ||
Takes real light curve data and converts it to a form that can be used for synthetic light curve injection. | ||
:param fluxes: The real light curve fluxes. | ||
:param times: The real light curve times. | ||
:return: Fake synthetic magnifications and times. | ||
""" | ||
flux_median_absolute_deviation = scipy.stats.median_abs_deviation(fluxes) | ||
normalized_fluxes = (fluxes / flux_median_absolute_deviation) * 0.25 | ||
# relative_times = times - np.min(times) | ||
return normalized_fluxes, times |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.