diff --git a/ooniapi/common/src/common/routers.py b/ooniapi/common/src/common/routers.py index 305d3079..5413f7ce 100644 --- a/ooniapi/common/src/common/routers.py +++ b/ooniapi/common/src/common/routers.py @@ -18,3 +18,6 @@ class BaseModel(PydandicBaseModel): date: lambda v: v.strftime(ISO_FORMAT_DATE), } ) + +class NotSupportedResponse(BaseModel): + mssg: str \ No newline at end of file diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/errors.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/errors.py new file mode 100644 index 00000000..e69de29b diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/main.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/main.py index b6201f7d..01d4fe79 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/main.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/main.py @@ -8,7 +8,8 @@ from prometheus_fastapi_instrumentator import Instrumentator -from .routers import aggregation, measurements +from .routers.v1 import aggregation +from .routers.v1 import measurements from .dependencies import get_clickhouse_session from .common.dependencies import get_settings diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/aggregation.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/aggregation.py similarity index 99% rename from ooniapi/services/oonimeasurements/src/oonimeasurements/routers/aggregation.py rename to ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/aggregation.py index 36ab79f1..c084d986 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/aggregation.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/aggregation.py @@ -1,6 +1,5 @@ """ Aggregation API -The routes are mounted under /api """ from datetime import datetime, timedelta, date @@ -20,7 +19,7 @@ from oonimeasurements.common.clickhouse_utils import query_click, query_click_one_row from oonimeasurements.common.utils import jerror, commasplit, convert_to_csv -from ..dependencies import get_clickhouse_session +from ...dependencies import get_clickhouse_session router = APIRouter() diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/measurements.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py similarity index 88% rename from ooniapi/services/oonimeasurements/src/oonimeasurements/routers/measurements.py rename to ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py index 5672ef1c..28bfefe8 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/measurements.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py @@ -1,6 +1,5 @@ """ Measurements API -The routes are mounted under /api """ from datetime import datetime, timedelta, timezone @@ -37,39 +36,48 @@ from urllib.request import urlopen from urllib.parse import urljoin, urlencode -from ..common.config import Settings -from ..common.dependencies import get_settings -from ..common.routers import BaseModel -from ..common.utils import setcacheresponse, commasplit, setnocacheresponse -from ..common.clickhouse_utils import query_click, query_click_one_row -from ..dependencies import get_clickhouse_session +from ...common.config import Settings +from ...common.dependencies import get_settings +from ...common.routers import BaseModel, NotSupportedResponse +from ...common.utils import setcacheresponse, commasplit, setnocacheresponse +from ...common.clickhouse_utils import query_click, query_click_one_row +from ...dependencies import get_clickhouse_session log = logging.getLogger(__name__) router = APIRouter() -FASTPATH_MSM_ID_PREFIX = "temp-fid-" -FASTPATH_SERVER = "fastpath.ooni.nu" -FASTPATH_PORT = 8000 - - urllib_pool = urllib3.PoolManager() -MsmtNotFound = HTTPException(status_code=500, detail="Measurement not found") +MeasurementNotFound = HTTPException(status_code=500, detail="Measurement not found") +ReportInputNotFound = HTTPException(status_code=500, details="Report and input not found") + +@router.get( + "/v1/files", + tags=["files"], + response_model=NotImplemented, +) +def list_files( + response: Response, +): + """List files - unsupported""" + setcacheresponse("1d", response) + return NotSupportedResponse(msg="not implemented") def measurement_uid_to_s3path_linenum(db: ClickhouseClient, measurement_uid: str): - # TODO: cleanup this - query = """SELECT s3path, linenum FROM jsonl + query = """ + SELECT s3path, linenum FROM jsonl PREWHERE (report_id, input) IN ( SELECT report_id, input FROM fastpath WHERE measurement_uid = :uid ) - LIMIT 1""" - query_params = dict(uid=measurement_uid) + LIMIT 1 + """ + query_params = dict(uid=measurement_uid) lookup = query_click_one_row(db, sql_text(query), query_params, query_prio=3) if lookup is None: - raise MsmtNotFound + raise MeasurementNotFound s3path = lookup["s3path"] linenum = lookup["linenum"] @@ -79,34 +87,19 @@ def measurement_uid_to_s3path_linenum(db: ClickhouseClient, measurement_uid: str def _fetch_jsonl_measurement_body_from_s3( s3path: str, linenum: int, + s3_bucket_name: str, ) -> bytes: - baseurl = f"https://{settings.s3_bucket_name}.s3.amazonaws.com/" + baseurl = f"https://{s3_bucket_name}.s3.amazonaws.com/" url = urljoin(baseurl, s3path) + log.info(f"Fetching {url}") r = urlopen(url) + f = gzip.GzipFile(fileobj=r, mode="r") for n, line in enumerate(f): if n == linenum: return line - - raise MsmtNotFound - - -class NotImplemented(BaseModel): - msg: str - - -@router.get( - "/v1/files", - tags=["files"], - response_model=NotImplemented, -) -def list_files( - response: Response, -): - """List files - unsupported""" - setcacheresponse("1d", response) - return NotImplemented(msg="not implemented") + raise MeasurementNotFound @router.get( @@ -117,20 +110,18 @@ def get_measurement( measurement_uid: str, download: bool, response: Response, - db=Depends(get_clickhouse_session) + db=Depends(get_clickhouse_session), + settings=Depends(get_settings) ): """ Get one measurement by measurement_id, Returns only the measurement without extra data from the database """ assert measurement_uid - try: - s3path, linenum = measurement_uid_to_s3path_linenum(db, measurement_uid) - except: - raise + s3path, linenum = measurement_uid_to_s3path_linenum(db, measurement_uid) log.debug(f"Fetching file {s3path} from S3") - body = _fetch_jsonl_measurement_body_from_s3(s3path, linenum) + body = _fetch_jsonl_measurement_body_from_s3(s3path, linenum, settings.s3_bucket_name) if download: response.headers["Content-Disposition"] = ( @@ -145,57 +136,24 @@ def get_measurement( ### Fetching measurement bodies -def report_id_input_to_s3path_linenum(db: ClickhouseClient, report_id: str, input: str): - query = """SELECT s3path, linenum FROM jsonl +def report_id_input_to_s3path_linenum(db: ClickhouseClient, report_id: str, input_: str): + query = """ + SELECT s3path, linenum FROM jsonl PREWHERE report_id = :report_id AND input = :inp - LIMIT 1""" - query_params = dict(inp=input, report_id=report_id) + LIMIT 1 + """ + query_params = dict(inp=input_, report_id=report_id) + lookup = query_click_one_row(db, sql_text(query), query_params, query_prio=3) - if lookup is None: - m = f"Missing row in jsonl table: {report_id} {input}" - log.error(m) - raise HTTPException + log.error(f"Missing row in jsonl table: {report_id} {input_}") + raise ReportInputNotFound s3path = lookup["s3path"] linenum = lookup["linenum"] return s3path, linenum -def _fetch_jsonl_measurement_body_clickhouse( - db: ClickhouseClient, - report_id: str, - input: Optional[str], - measurement_uid: Optional[str], -) -> Optional[bytes]: - """ - Fetch jsonl from S3, decompress it, extract single msmt - """ - # TODO: switch to _fetch_measurement_body_by_uid - if measurement_uid is not None: - try: - s3path, linenum = measurement_uid_to_s3path_linenum(db, measurement_uid) - except MsmtNotFound: - log.error(f"Measurement {measurement_uid} not found in jsonl") - return None - - else: - inp = input or "" # NULL/None input is stored as '' - try: - s3path, linenum = report_id_input_to_s3path_linenum(db, report_id, inp) - except Exception: - log.error(f"Measurement {report_id} {inp} not found in jsonl") - return None - - try: - log.debug(f"Fetching file {s3path} from S3") - # TODO(arturo): remove ignore once https://github.com/jsocol/pystatsd/pull/184 lands - return _fetch_jsonl_measurement_body_from_s3(s3path, linenum) # type: ignore - except Exception: # pragma: no cover - log.error(f"Failed to fetch file {s3path} from S3") - return None - - def _unwrap_post(post: dict) -> dict: fmt = post.get("format", "") if fmt == "json": @@ -203,29 +161,6 @@ def _unwrap_post(post: dict) -> dict: raise Exception("Unexpected format") -def _fetch_measurement_body_on_disk_by_msmt_uid(msmt_uid: str) -> Optional[bytes]: - """ - Fetch raw POST from disk, extract msmt - This is used only for msmts that have been processed by the fastpath - but are not uploaded to S3 yet. - YAML msmts not supported: requires implementing normalization here - """ - assert msmt_uid.startswith("20") - tstamp, cc, testname, hash_ = msmt_uid.split("_") - hour = tstamp[:10] - int(hour) # raise if the string does not contain an integer - spooldir = Path("/var/lib/ooniapi/measurements/incoming/") - postf = spooldir / f"{hour}_{cc}_{testname}/{msmt_uid}.post" - log.debug(f"Attempt at reading {postf}") - try: - with postf.open() as f: - post = ujson.load(f) - except FileNotFoundError: - return None - body = _unwrap_post(post) - return ujson.dumps(body).encode() - - def _fetch_measurement_body_from_hosts(other_collectors: List[str], msmt_uid: str) -> Optional[bytes]: """ Fetch raw POST from another API host, extract msmt @@ -264,61 +199,88 @@ def _fetch_measurement_body_from_hosts(other_collectors: List[str], msmt_uid: st return None +def _fetch_jsonl_measurement_body_clickhouse( + db: ClickhouseClient, + report_id: str, + input_: Optional[str], + measurement_uid: Optional[str], +) -> Optional[bytes]: + """ + Fetch jsonl from S3, decompress it, extract single msmt + """ + # TODO: switch to _fetch_measurement_body_by_uid + if measurement_uid is not None: + try: + s3path, linenum = measurement_uid_to_s3path_linenum(db, measurement_uid) + except MeasurementNotFound: + log.error(f"Measurement {measurement_uid} not found in jsonl") + return None + + else: + inp = input_ or "" # NULL/None input is stored as '' + try: + s3path, linenum = report_id_input_to_s3path_linenum(db, report_id, inp) + except Exception: + log.error(f"Measurement {report_id} {inp} not found in jsonl") + return None + + try: + log.debug(f"Fetching file {s3path} from S3") + # TODO(arturo): remove ignore once https://github.com/jsocol/pystatsd/pull/184 lands + return _fetch_jsonl_measurement_body_from_s3(s3path, linenum) # type: ignore + except Exception: # pragma: no cover + log.error(f"Failed to fetch file {s3path} from S3") + return None + + def _fetch_measurement_body( db: ClickhouseClient, settings: Settings, report_id: str, - input: Optional[str], + input_: Optional[str], measurement_uid: str ) -> bytes: """ Fetch measurement body from either: - - local measurement spool dir (.post files) - JSONL files on S3 - remote measurement spool dir (another API/collector host) """ # TODO: uid_cleanup - log.debug(f"Fetching body for {report_id} {input}") + log.debug(f"Fetching body for {report_id} {input_}") u_count = report_id.count("_") - # 5: Current format e.g. - # 20210124T210009Z_webconnectivity_VE_22313_n1_Ojb - new_format = u_count == 5 and measurement_uid + # Current format e.g. 20210124T210009Z_webconnectivity_VE_22313_n1_Ojb + new_format = (u_count == 5 and measurement_uid) - fresh = False - if new_format: + # if the measurement belongs to an old data format, fetch it from the clickhouse tables + if not new_format: + body = _fetch_jsonl_measurement_body_clickhouse(db, report_id, input_, measurement_uid) + + else: ts = (datetime.now(timezone.utc) - timedelta(hours=1)).strftime("%Y%m%d%H%M") fresh = measurement_uid > ts - other_collectors = settings.other_collectors - # Do the fetching in different orders based on the likelyhood of success - if new_format and fresh: - body = ( - _fetch_measurement_body_on_disk_by_msmt_uid(measurement_uid) - or _fetch_measurement_body_from_hosts(other_collectors, measurement_uid) - or _fetch_jsonl_measurement_body_clickhouse( - db, report_id, input, measurement_uid + other_collectors = settings.other_collectors + # Do the fetching in different orders based on the likelyhood of success + if new_format and fresh: + body = ( + _fetch_measurement_body_from_hosts(other_collectors, measurement_uid) + or _fetch_jsonl_measurement_body_clickhouse( + db, report_id, input_, measurement_uid + ) ) - ) - elif new_format and not fresh: - body = ( - _fetch_jsonl_measurement_body_clickhouse( - db, report_id, input, measurement_uid + elif new_format and not fresh: + body = ( + _fetch_jsonl_measurement_body_clickhouse( + db, report_id, input_, measurement_uid + ) + or _fetch_measurement_body_from_hosts(other_collectors, measurement_uid) ) - or _fetch_measurement_body_on_disk_by_msmt_uid(measurement_uid) - or _fetch_measurement_body_from_hosts(other_collectors, measurement_uid) - ) - - else: - body = _fetch_jsonl_measurement_body_clickhouse( - db, report_id, input, measurement_uid - ) if body: return body # type: ignore - raise MsmtNotFound - + raise MeasurementNotFound def format_msmt_meta(msmt_meta: dict) -> dict: @@ -356,33 +318,37 @@ def _get_measurement_meta_clickhouse( WHERE fastpath.input = :input AND fastpath.report_id = :report_id """ - query_params = dict(input=input_, report_id=report_id) query += "LIMIT 1" + query_params = dict(input=input_, report_id=report_id) + msmt_meta = query_click_one_row(db, sql_text(query), query_params, query_prio=3) + if not msmt_meta: return {} # measurement not found if msmt_meta["probe_asn"] == 0: # https://ooni.org/post/2020-ooni-probe-asn-incident-report/ # https://github.com/ooni/explorer/issues/495 - return {} # unwanted + return {} return format_msmt_meta(msmt_meta) def _get_measurement_meta_by_uid(db: ClickhouseClient, measurement_uid: str) -> dict: - query = """SELECT * FROM fastpath + query = """ + SELECT * FROM fastpath LEFT OUTER JOIN citizenlab ON citizenlab.url = fastpath.input WHERE measurement_uid = :uid LIMIT 1 - """ + """ query_params = dict(uid=measurement_uid) msmt_meta = query_click_one_row(db, sql_text(query), query_params, query_prio=3) + if not msmt_meta: return {} # measurement not found if msmt_meta["probe_asn"] == 0: # https://ooni.org/post/2020-ooni-probe-asn-incident-report/ # https://github.com/ooni/explorer/issues/495 - return {} # unwanted + return {} return format_msmt_meta(msmt_meta) @@ -411,7 +377,7 @@ async def get_raw_measurement( settings=Depends(get_settings), ) -> Response: """ - Get raw measurement body by report_id + input + Get raw measurement body """ # This is used by Explorer to let users download msmts if measurement_uid: @@ -421,7 +387,7 @@ async def get_raw_measurement( # _fetch_measurement_body needs the UID msmt_meta = _get_measurement_meta_clickhouse(db, report_id, input) else: - HTTPException(status_code=400, detail="Either report_id or measurement_uid must be provided") + raise HTTPException(status_code=400, detail="Either report_id or measurement_uid must be provided") body = "{}" if msmt_meta: