diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index df698dd..18041cc 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -35,7 +35,7 @@ jobs: - name: Determine the image tag run: | - docker run "lsstsqre/centos:7-stack-lsst_distrib-$STACK_TAG" bash -c "cat stack/miniconda*/ups_db/global.tags" > eups.tag + docker run "lsstsqre/centos:7-stack-lsst_distrib-$STACK_TAG" bash -c "cat conda/envs/lsst-scipipe-*/share/eups/ups_db/global.tags" > eups.tag echo "eups tag = $(< eups.tag)" echo "IMAGE_TAG=$(< eups.tag)" >> $GITHUB_ENV diff --git a/Dockerfile b/Dockerfile index 6ab21f0..ef69c85 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,8 @@ -ARG STACK_TAG="w_2024_30" +ARG STACK_TAG="w_latest" FROM lsstsqre/centos:7-stack-lsst_distrib-${STACK_TAG} USER root RUN <" + ) - output_lines.append(f"") + raws = {r.id: r.group for r in raw_exposures} + log_group_detector = { + (raws[visit], detector) for visit, detector in log_visit_detector + } + df = get_status_code_from_loki(day_obs) + df = df[(df["instrument"] == "LATISS") & (df["group"].isin(raws.values()))] + + status_groups = df.set_index(["group", "detector"]).groupby("code").groups + for code in status_groups: + counts = len(status_groups[code]) + output_lines.append(f"- {counts} counts have status code {code}.") + + indices = status_groups[code].intersection(log_group_detector) + if not indices.empty and code != 200: + output_lines.append(f" - {len(indices)} have outputs.") + counts -= len(indices) + + match code: + case 500: + df = get_timeout_from_loki(day_obs) + df = df[ + (df["instrument"] == "LATISS") & (df["group"].isin(raws.values())) + ].set_index(["group", "detector"]) + indices = status_groups[code].intersection(df.index) + if not indices.empty: + output_lines.append(f" - {len(indices)} timed out.") + counts -= len(indices) + if counts > 0: + output_lines.append(f" - {counts} to be investigated.") + + output_lines.append( + f"" + ) return "\n".join(output_lines) if __name__ == "__main__": - url = os.getenv("SLACK_WEBHOOK_URL") day_obs = date.today() - timedelta(days=1) day_obs_string = day_obs.strftime("%Y-%m-%d") summary = make_summary_message(day_obs_string) - output_message = f"*LATISS {day_obs.strftime('%A %Y-%m-%d')}*\n" + summary + output_message = ( + f":clamps: *LATISS {day_obs.strftime('%A %Y-%m-%d')}* :clamps: \n" + summary + ) if not url: print("Must set environment variable SLACK_WEBHOOK_URL in order to post") @@ -84,10 +195,9 @@ def make_summary_message(day_obs): sys.exit(1) res = requests.post( - url, headers={"Content-Type": "application/json"}, - json={"text": output_message} - ) + url, headers={"Content-Type": "application/json"}, json={"text": output_message} + ) - if(res.status_code != 200): + if res.status_code != 200: print("Failed to send message") print(res) diff --git a/scripts/queries.py b/scripts/queries.py new file mode 100644 index 0000000..65d8489 --- /dev/null +++ b/scripts/queries.py @@ -0,0 +1,214 @@ +# This file is part of nightly-reporting-jobs. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = [ + "get_next_visit_events", + "get_status_code_from_loki", + "get_timeout_from_loki", +] +import logging +import json +import re +import subprocess + +from astropy.time import Time, TimeDelta +import pandas + +from lsst_efd_client import EfdClient + +logging.basicConfig( + format="{levelname} {asctime} {name} - {message}", + style="{", +) +_log = logging.getLogger(__name__) +_log.setLevel(logging.DEBUG) + + +def get_start_end(day_obs): + """Return start time and end time of a day_obs + + Parameters + ---------- + day_obs : `str` + day_obs in the format of YYYY-MM-DD. + """ + start = Time(day_obs, scale="utc", format="isot") + TimeDelta( + 12 * 60 * 60, format="sec" + ) + end = start + TimeDelta(1, format="jd") + return start, end + + +async def get_next_visit_events(day_obs, sal_index, survey): + """Obtain uncanceled nextVisit events + + Parameters + ---------- + day_obs : `str` + day_obs in the format of YYYY-MM-DD. + + sal_index : `int` + Index of Script SAL component. Use this as a proxy of the instrument. + TODO: just use instrument. + + survey : `str` + The imaging survey name of interest. + """ + client = EfdClient("usdf_efd") + + topic = "lsst.sal.ScriptQueue.logevent_nextVisit" + start, end = get_start_end(day_obs) + df = await client.select_time_series(topic, ["*"], start.utc, end.utc) + canceled = await client.select_time_series( + topic + "Canceled", ["*"], start.utc, end.utc + ) + + if df.empty: + _log.info(f"No events on {day_obs}") + return pandas.DataFrame() + + # Only select on-sky exposures from the selected survey + df = df.loc[ + (df["coordinateSystem"] == 2) + & (df["salIndex"] == sal_index) + & (df["survey"] == survey) + ].set_index("groupId") + _log.info(f"There were {len(df)} {survey} nextVisit events on {day_obs}") + + # Ignore the explicitly canceled groups + if not canceled.empty: + canceled = df.index.intersection(canceled.set_index("groupId").index).tolist() + if canceled: + _log.info(f"{len(canceled)} events were canceled {canceled}") + df = df.drop(canceled) + + return df + + +def query_loki(day_obs, pod_name, search_string): + """Query Grafana Loki for log records. + + Parameters + ---------- + day_obs : `str` + day_obs in the format of YYYY-MM-DD. + """ + start, end = get_start_end(day_obs) + command = [ + "logcli", + "query", + "--output=jsonl", + "--tls-skip-verify", + "--addr=http://sdfloki.slac.stanford.edu:80", + "--timezone=UTC", + "-q", + "--limit=10000", + "--proxy-url=http://sdfproxy.sdf.slac.stanford.edu:3128", + f'--from={start.strftime("%Y-%m-%dT%H:%M:%SZ")}', + f'--to={end.strftime("%Y-%m-%dT%H:%M:%SZ")}', + f'{{app="vcluster--usdf-prompt-processing",pod=~"{pod_name}-.+"}} {search_string}', + ] + + result = subprocess.run(command, capture_output=True, text=True) + if result.returncode != 0: + _log.error("Loki query failed") + _log.error(results.stderr) + return + + return result.stdout + + +def get_status_code_from_loki(day_obs): + """Get status return codes from next-visit-fan-out + + Parameters + ---------- + day_obs : `str` + day_obs in the format of YYYY-MM-DD. + + Returns + ------- + df : `pandas.DataFrame` + """ + results = query_loki( + day_obs, + pod_name="next-visit-fan-out", + search_string='|~ "status code" |~ "for initial request"', + ) + pattern = re.compile( + r".*nextVisit {'instrument': '(?P\w*)', 'groupId': '(?P[^' ]*)', 'detector': (?P\d*)} status code (?P\d*) for.*timestamp\":\"(?P\S*)\"" + ) + records = [] + for line in results.splitlines(): + m1 = pattern.match(line) + if m1: + records.append( + ( + m1["instrument"], + m1["group"], + int(m1["detector"]), + int(m1["code"]), + m1["timestamp"], + ) + ) + df = pandas.DataFrame.from_records( + data=records, columns=["instrument", "group", "detector", "code", "timestamp"] + ) + return df + + +def get_timeout_from_loki(day_obs): + """Get the IDs of the timed out cases. + + Parameters + ---------- + day_obs : `str` + day_obs in the format of YYYY-MM-DD. + + Returns + ------- + df : `pandas.DataFrame` + """ + results = query_loki( + day_obs, + pod_name="prompt-proto-service", + search_string='|~ "Timed out waiting for image after receiving exposures"', + ) + + if not results: + return pandas.DataFrame(columns=["instrument", "group", "detector", "ts"]) + + parsed_data = [] + for result in results.splitlines(): + try: + data = json.loads(result) + parsed_data.append(data) + except json.JSONDecodeError as e: + _log.error(f"Failed to parse \n{result}\n JSON decode error: {e}") + + df = pandas.json_normalize(parsed_data) + df = df.merge( + pandas.json_normalize(df["line"].apply(json.loads)), + left_index=True, + right_index=True, + ).drop(columns=["line"]) + + return df