Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-45576: Start a nightly cronjob to post a brief summary #3

Merged
merged 9 commits into from
Sep 28, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:

- name: Determine the image tag
run: |
docker run "lsstsqre/centos:7-stack-lsst_distrib-$STACK_TAG" bash -c "cat stack/miniconda*/ups_db/global.tags" > eups.tag
docker run "lsstsqre/centos:7-stack-lsst_distrib-$STACK_TAG" bash -c "cat conda/envs/lsst-scipipe-*/share/eups/ups_db/global.tags" > eups.tag
echo "eups tag = $(< eups.tag)"
echo "IMAGE_TAG=$(< eups.tag)" >> $GITHUB_ENV

Expand Down
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
ARG STACK_TAG="w_2024_30"
ARG STACK_TAG="w_latest"
FROM lsstsqre/centos:7-stack-lsst_distrib-${STACK_TAG}
USER root
RUN <<EOT
set -ex
curl -O -L https://github.com/grafana/loki/releases/download/v2.9.9/logcli-2.9.9.x86_64.rpm
rpm -i logcli-2.9.9.x86_64.rpm
rm logcli-2.9.9.x86_64.rpm
EOT
USER lsst
WORKDIR /
COPY scripts scripts/
RUN <<EOT
set -ex
source /opt/lsst/software/stack/loadLSST.bash
pip install lsst-efd-client
EOT
178 changes: 144 additions & 34 deletions scripts/prompt_processing_summary.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import asyncio
import sys
import os
import lsst.daf.butler as dafButler
from dataclasses import dataclass
from datetime import date, timedelta
import requests

from queries import (
get_next_visit_events,
get_status_code_from_loki,
get_timeout_from_loki,
)


def make_summary_message(day_obs):
"""Make Prompt Processing summary message for a night
Expand All @@ -19,63 +26,167 @@ def make_summary_message(day_obs):

day_obs_int = int(day_obs.replace("-", ""))

b = dafButler.Butler("/repo/embargo", collections="LATISS/raw/all")
raw_visit_detector = set(
[
(x.dataId["exposure"], x.dataId["detector"])
for x in b.registry.queryDatasets(
"raw",
where=f"exposure.day_obs={day_obs_int} AND exposure.observation_type='science'",
)
]
survey = "BLOCK-306"
next_visits = asyncio.run(get_next_visit_events(day_obs, 2, survey))

butler_nocollection = dafButler.Butler("/repo/embargo")
raw_exposures = butler_nocollection.query_dimension_records(
"exposure",
instrument="LATISS",
where=f"day_obs={day_obs_int} AND exposure.can_see_sky",
explain=False,
)

# Do not send message if there are no on-sky exposures.
if len(raw_exposures) == 0:
sys.exit(0)

output_lines.append("Number of on-sky exposures: {:d}".format(len(raw_exposures)))

raw_exposures = butler_nocollection.query_dimension_records(
"exposure",
instrument="LATISS",
where=f"day_obs=day_obs_int AND exposure.science_program IN (survey)",
bind={"day_obs_int": day_obs_int, "survey": survey},
explain=False,
)
output_lines.append("Number of science raws: {:d}".format(len(raw_visit_detector)))

if len(raw_visit_detector) == 0:
output_lines.append(
f"Number for {survey}: {len(next_visits)} uncanceled nextVisit, {len(raw_exposures):d} raws"
)

if len(raw_exposures) == 0:
return "\n".join(output_lines)

butler_nocollection = dafButler.Butler("/repo/embargo")
try:
collections = butler_nocollection.registry.queryCollections(
collections = butler_nocollection.collections.query(
f"LATISS/prompt/output-{day_obs:s}"
)
collection = list(collections)[0]
except dafButler.registry.MissingCollectionError:
except dafButler.MissingCollectionError:
output_lines.append(f"No output collection was found for {day_obs:s}")
return "\n".join(output_lines)

b = dafButler.Butler("/repo/embargo", collections=[collection, "LATISS/defaults"])
sfm_counts = len(
butler_nocollection.query_datasets(
"isr_log",
collections=f"LATISS/prompt/output-{day_obs:s}/SingleFrame*",
where=f"exposure.science_program IN (survey)",
bind={"survey": survey},
find_first=False,
explain=False,
)
)
dia_counts = len(
butler_nocollection.query_datasets(
"isr_log",
collections=f"LATISS/prompt/output-{day_obs:s}/ApPipe*",
where=f"exposure.science_program IN (survey)",
bind={"survey": survey},
find_first=False,
explain=False,
)
)

log_visit_detector = set([(x.dataId['exposure'], x.dataId['detector']) for x in b.registry.queryDatasets("isr_log")])
output_lines.append("Number of ISRs attempted: {:d}".format(len(log_visit_detector)))
b = dafButler.Butler("/repo/embargo", collections=[collection, "LATISS/defaults"])

pvi_visit_detector = set([(x.dataId['visit'], x.dataId['detector']) for x in b.registry.queryDatasets("initial_pvi")])
output_lines.append("Number of successful initial_pvi results: {:d}".format(len(pvi_visit_detector)))
log_visit_detector = set(
[
(x.dataId["exposure"], x.dataId["detector"])
for x in b.query_datasets(
"isr_log",
where=f"exposure.science_program IN (survey)",
bind={"survey": survey},
)
]
)
output_lines.append(
"Number of main pipeline runs: {:d} total, {:d} SingleFrame, {:d} ApPipe".format(
len(log_visit_detector), sfm_counts, dia_counts
)
)

missing_pvis = set(log_visit_detector - pvi_visit_detector)
missing_visits = [x[0] for x in missing_pvis]
output_lines.append("Number of unsuccessful processCcd attempts (no resulting initial_pvi): {:d}".format(len(missing_pvis)))
sfm_outputs = len(
b.query_datasets(
"initial_photometry_match_detector",
where=f"exposure.science_program IN (survey)",
bind={"survey": survey},
explain=False,
)
)
output_lines.append(
"- ProcessCcd: {:d} attempts, {:d} succeeded, {:d} failed.".format(
sfm_counts + dia_counts, sfm_outputs, sfm_counts + dia_counts - sfm_outputs
)
)

dia_visit_detector = set([(x.dataId['visit'], x.dataId['detector']) for x in b.registry.queryDatasets("apdb_marker")])
output_lines.append("Number of successful DIA attempted: {:d}".format(len(dia_visit_detector)))
dia_visit_detector = set(
[
(x.dataId["visit"], x.dataId["detector"])
for x in b.query_datasets(
"apdb_marker",
where=f"exposure.science_program IN (survey)",
bind={"survey": survey},
explain=False,
)
]
)
output_lines.append(
"- ApPipe: {:d} attempts, {:d} succeeded, {:d} failed.".format(
dia_counts, len(dia_visit_detector), dia_counts - len(dia_visit_detector)
)
)

missing_dias = set(log_visit_detector - dia_visit_detector)
missing_visits = [x[0] for x in missing_dias]
output_lines.append("Number of unsuccessful DIA attempts (no resulting apdb_marker): {:d}".format(len(missing_dias)))
output_lines.append(
f"<https://usdf-rsp-dev.slac.stanford.edu/times-square/github/lsst-dm/vv-team-notebooks/PREOPS-prompt-error-msgs?day_obs={day_obs}&instrument=LATISS&ts_hide_code=1|Full Error Log>"
)

output_lines.append(f"<https://usdf-rsp-dev.slac.stanford.edu/times-square/github/lsst-dm/vv-team-notebooks/PREOPS-prompt-error-msgs?day_obs={day_obs}&instrument=LATISS&ts_hide_code=1|Full Error Log>")
raws = {r.id: r.group for r in raw_exposures}
log_group_detector = {
(raws[visit], detector) for visit, detector in log_visit_detector
}
df = get_status_code_from_loki(day_obs)
df = df[(df["instrument"] == "LATISS") & (df["group"].isin(raws.values()))]

status_groups = df.set_index(["group", "detector"]).groupby("code").groups
for code in status_groups:
counts = len(status_groups[code])
output_lines.append(f"- {counts} counts have status code {code}.")

indices = status_groups[code].intersection(log_group_detector)
if not indices.empty and code != 200:
output_lines.append(f" - {len(indices)} have outputs.")
counts -= len(indices)

match code:
case 500:
df = get_timeout_from_loki(day_obs)
df = df[
(df["instrument"] == "LATISS") & (df["group"].isin(raws.values()))
].set_index(["group", "detector"])
indices = status_groups[code].intersection(df.index)
if not indices.empty:
output_lines.append(f" - {len(indices)} timed out.")
counts -= len(indices)
if counts > 0:
output_lines.append(f" - {counts} to be investigated.")

output_lines.append(
f"<https://usdf-rsp-dev.slac.stanford.edu/times-square/github/lsst-sqre/times-square-usdf/prompt-processing/groups?date={day_obs}&instrument=LATISS&survey={survey}&mode=DEBUG&ts_hide_code=1|Timing plots>"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does &mode=DEBUG do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that Times Square notebook, written before we started making this reporting tool, I wanted a "report" mode versus a "debug" mode. The former is brief and meant for people who don't care about PP internals. The latter has a lot more details which the PP team can use to know what might have gone wrong. Now we have this reporting tool, I probably can go back and think again whether it makes sense.

)

return "\n".join(output_lines)


if __name__ == "__main__":

url = os.getenv("SLACK_WEBHOOK_URL")

day_obs = date.today() - timedelta(days=1)
day_obs_string = day_obs.strftime("%Y-%m-%d")
summary = make_summary_message(day_obs_string)
output_message = f"*LATISS {day_obs.strftime('%A %Y-%m-%d')}*\n" + summary
output_message = (
f":clamps: *LATISS {day_obs.strftime('%A %Y-%m-%d')}* :clamps: \n" + summary
)

if not url:
print("Must set environment variable SLACK_WEBHOOK_URL in order to post")
Expand All @@ -84,10 +195,9 @@ def make_summary_message(day_obs):
sys.exit(1)

res = requests.post(
url, headers={"Content-Type": "application/json"},
json={"text": output_message}
)
url, headers={"Content-Type": "application/json"}, json={"text": output_message}
)

if(res.status_code != 200):
if res.status_code != 200:
print("Failed to send message")
print(res)
Loading