From 160e7a5b9b8ffceaaee23459c5d0a58e4d4715dd Mon Sep 17 00:00:00 2001 From: Louis Beaumont Date: Mon, 12 Dec 2022 22:00:46 +0100 Subject: [PATCH] 1.4.4: improve logging & api validation --- langame/functions/services.py | 29 +++++++++------- run/collection/bench.py | 16 +++++++-- run/collection/logic.py | 59 +++++++++++++++++++++++++++++--- run/collection/service.prod.yaml | 4 +-- setup.py | 2 +- 5 files changed, 87 insertions(+), 23 deletions(-) diff --git a/langame/functions/services.py b/langame/functions/services.py index 3e01baf..75109a0 100644 --- a/langame/functions/services.py +++ b/langame/functions/services.py @@ -1,3 +1,4 @@ +from concurrent.futures import ThreadPoolExecutor import time import asyncio from typing import List, Optional, Tuple, Any @@ -9,14 +10,14 @@ import pytz from random import choice from firebase_admin import firestore -from google.cloud.firestore import DocumentSnapshot, AsyncClient +from google.cloud.firestore import DocumentSnapshot, Client from sentry_sdk import capture_exception import logging import datetime utc = pytz.UTC -async def request_starter_for_service( +def request_starter_for_service( api_key_doc: DocumentSnapshot, org_doc: DocumentSnapshot, topics: List[str], @@ -42,12 +43,13 @@ async def request_starter_for_service( Returns: Tuple of (starter, user message). """ + logger = logging.getLogger(__name__) if org_doc.to_dict().get("credits", -1) <= 0: message = ( "you do not have enough credits, " + "please buy more on https://langa.me or contact us at contact@langa.me" ) - logging.warning(message) + logger.warning(message) return None, { "message": message, "code": 402, @@ -55,10 +57,10 @@ async def request_starter_for_service( "user_message": message, } - db: AsyncClient = AsyncClient() + db: Client = Client() conversation_starters_history_docs = ( - await db.collection("history").document(org_doc.id).get() + db.collection("history").document(org_doc.id).get() ) conversation_starters_history_list = ( conversation_starters_history_docs.to_dict().get("conversation_starters", []) @@ -68,10 +70,12 @@ async def request_starter_for_service( new_history = [] poll_interval = 0.1 - async def generate() -> Tuple[Optional[DocumentSnapshot], Optional[dict]]: + def generate(i: int) -> Tuple[Optional[DocumentSnapshot], Optional[dict]]: timeout = 60 start_time = time.time() - _, ref = await db.collection("memes").add( + # format to human readable date time + logger.info(f"[{i}] Generating starter at {datetime.datetime.now(utc)}") + _, ref = db.collection("memes").add( { "state": "to-process", "topics": topics, @@ -88,7 +92,7 @@ async def generate() -> Tuple[Optional[DocumentSnapshot], Optional[dict]]: # poll until it's in state "processed" or "error", timeout after 1 minute while True: - prompt_doc = await db.collection("memes").document(ref.id).get() + prompt_doc = db.collection("memes").document(ref.id).get() data = prompt_doc.to_dict() if data.get("state") == "processed" and data.get("content", None): if translated and not data.get("translated", None): @@ -99,9 +103,10 @@ async def generate() -> Tuple[Optional[DocumentSnapshot], Optional[dict]]: "createdAt": datetime.datetime.now(utc), } ) + logger.info(f"[{i}] Generated starter in {time.time() - start_time}s") return prompt_doc, None if data.get("state") == "error": - logging.error( + logger.error( f"Failed to request starter for {api_key_doc.id}", exc_info=1 ) error = data.get("error", "unknown error") @@ -137,7 +142,8 @@ async def generate() -> Tuple[Optional[DocumentSnapshot], Optional[dict]]: time.sleep(poll_interval) # generate in parallel for "limit" - responses = await asyncio.gather(*[generate() for _ in range(limit)]) + with ThreadPoolExecutor(limit) as executor: + responses = executor.map(generate, range(limit)) conversation_starters, errors = zip(*responses) # if any are errors, return the first error if any(errors): @@ -154,14 +160,13 @@ async def generate() -> Tuple[Optional[DocumentSnapshot], Optional[dict]]: ) + new_history ) - # sync org_doc.reference.update( { "credits": firestore.Increment(-1), "lastSpent": firestore.SERVER_TIMESTAMP, } ) - await conversation_starters_history_docs.reference.set( + conversation_starters_history_docs.reference.set( {"conversation_starters": conversation_starters_history}, merge=True ) diff --git a/run/collection/bench.py b/run/collection/bench.py index 8180f39..1b9af89 100644 --- a/run/collection/bench.py +++ b/run/collection/bench.py @@ -1,13 +1,14 @@ import os -import timeit # disable pylint for docstring # pylint: disable=C0116 # pylint: disable=C0115 import requests import random +from pyinstrument import Profiler -URL = "https://api.langa.me" +IS_LOCAL = os.environ.get("IS_TESTING", "false") == "true" +URL = "http://127.0.0.1:8080" if IS_LOCAL else "https://api.langa.me" fun_topics = [ "fun", @@ -26,6 +27,7 @@ def request(): data = { # pick 2 random topic "topics": random.sample(fun_topics, 2), + "limit": 3, } print("querying with topics: ", data["topics"]) r = requests.post( @@ -39,4 +41,12 @@ def request(): print("OUTPUT:", data["results"][0]["conversation_starter"]["en"]) -timeit.repeat(request, repeat=10, number=1) + +profiler = Profiler() +REPEAT = 2 + +for i in range(REPEAT): + profiler.start() + request() + profiler.stop() + profiler.print() diff --git a/run/collection/logic.py b/run/collection/logic.py index 8e24ae3..47988c8 100644 --- a/run/collection/logic.py +++ b/run/collection/logic.py @@ -8,10 +8,21 @@ from langame.functions.services import request_starter_for_service from firebase_admin import firestore, initialize_app from google.cloud.firestore import Client +import re initialize_app() -logger = logging.getLogger() +logger = logging.getLogger(__name__) +# set logging format with time logging.basicConfig(level=logging.INFO) +logging.Formatter.converter = time.gmtime +logging.Formatter.default_time_format = "%Y-%m-%d %H:%M:%S" +logging.Formatter.default_msec_format = "%s.%03d" +logger.addHandler(logging.StreamHandler()) +logger.handlers[0].setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%d-%b-%y %H:%M:%S" + ) +) db: Client = firestore.client() @@ -109,7 +120,7 @@ async def create_starter(): limit = json_data.get("limit", 1) translated = json_data.get("translated", False) personas = json_data.get("personas", []) - logging.info(f"Inputs:\n{json_data}") + logger.info(f"Inputs:\n{json_data}") if len(personas) > 4: return ( jsonify( @@ -139,13 +150,51 @@ async def create_starter(): {}, ) + # if topics is garbage (too long, too short, special characters, + # None, non alpha-numeric, etc.) + if not topics: + return ( + jsonify( + { + "error": { + "message": "No topics provided", + "status": "INVALID_ARGUMENT", + }, + "results": [], + } + ), + 400, + {}, + ) + if ( + any(len(topic) > 50 for topic in topics) + or any(len(topic) < 3 for topic in topics) + or any(not re.match(r"^[a-zA-Z ]+$", topic) for topic in topics) + ): + return ( + jsonify( + { + "error": { + "message": "Topics are invalid" + + " (only alphanumeric characters and spaces allowed)" + + " for example: 'dating' or 'love, relationships'" + + " and must be between 3 and 50 characters long", + "status": "INVALID_ARGUMENT", + }, + "results": [], + } + ), + 400, + {}, + ) + # limit max 20 - if limit > 20: + if limit > 10: return ( jsonify( { "error": { - "message": "You can only request up to 20 memes at a time", + "message": "You can only request up to 10 conversation starters at a time", "status": "INVALID_ARGUMENT", }, "results": [], @@ -159,7 +208,7 @@ async def create_starter(): op="task", name="request_starter_for_service" ) as span: # https://cloud.google.com/run/docs/tips/general#avoid_background_activities_if_cpu_is_allocated_only_during_request_processing - conversation_starters, error = await request_starter_for_service( + conversation_starters, error = request_starter_for_service( api_key_doc=api_key_doc, org_doc=org_doc, topics=topics, diff --git a/run/collection/service.prod.yaml b/run/collection/service.prod.yaml index a7745bb..c22e07b 100644 --- a/run/collection/service.prod.yaml +++ b/run/collection/service.prod.yaml @@ -14,13 +14,13 @@ spec: spec: containerConcurrency: 5 containers: - image: gcr.io/langame-86ac4/collection:1.2.0 + image: gcr.io/langame-86ac4/collection:1.2.1 ports: - containerPort: 8080 name: http1 env: - name: SENTRY_RELEASE - value: "1.2.0" + value: "1.2.1" - name: ENVIRONMENT value: "production" resources: diff --git a/setup.py b/setup.py index b495ffa..e1d11ef 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ name="langame", packages=find_packages(), include_package_data=True, - version="1.4.3", + version="1.4.4", description="", install_requires=[ "firebase_admin",