diff --git a/dblib/dblib.py b/dblib/dblib.py index 12c7c0a..e4021eb 100644 --- a/dblib/dblib.py +++ b/dblib/dblib.py @@ -42,6 +42,21 @@ def __init__(self, id, level, message, tool, timezone="UTC", logged_at=None, com self.comment = comment +class Logger: + def __init__(self, tool: str, cursor: Cursor): + self.tool = tool + self.cursor = cursor + + def info(self, message: str, comment: str = ""): + insert_log(self.cursor, "INFO", message, self.tool, comment) + + def error(self, message: str, comment: str = ""): + insert_log(self.cursor, "ERROR", message, self.tool, comment) + + def warning(self, message: str, comment: str = ""): + insert_log(self.cursor, "WARNING", message, self.tool, comment) + + def get_group_by_id(cursor, id: int) -> Union[PgGroup, None]: try: cursor.execute('SELECT * FROM "Group" WHERE id = (%s)', (id,)) diff --git a/handler/Dockerfile b/handler/Dockerfile index cfeb04d..e7a25b8 100644 --- a/handler/Dockerfile +++ b/handler/Dockerfile @@ -7,14 +7,12 @@ WORKDIR /app COPY requirements.txt . COPY dblib.py . COPY lib.py . -COPY test_router.py . -COPY send_router.py . -COPY groups_router.py . COPY store.py . +COPY routers/ routers/ +COPY main.py . RUN pip install --no-cache-dir -r requirements.txt -COPY main.py . - CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "5501"] +LABEL org.opencontainers.image.source=https://github.com/kougen/sms-automation-backend diff --git a/handler/main.py b/handler/main.py index 66884c1..865f881 100644 --- a/handler/main.py +++ b/handler/main.py @@ -1,14 +1,22 @@ -from fastapi.responses import JSONResponse -from psycopg.errors import InvalidTextRepresentation -import datetime -from fastapi import FastAPI -import threading -from lib import msgs, LogRequest, LogsRequest, PhoneRequest, PutPendingMessageRequest, BASE_PATH -from dblib import get_group_by_id, is_server_up, get_db_cursor_and_connection, insert_log, insert_logs, PgLog -from store import SRV_ADDR, HANDLER_VERSION, CHECKER_VERSION, WEBSITE_VERSION, RECIEVER_VERSION, SELF_URL -from test_router import testrouter -from send_router import sendrouter -from groups_router import groupsrouter +from fastapi import FastAPI +from pathlib import Path +import os +import sys +from routers.test_router import testrouter +from routers.send_router import sendrouter +from routers.groups_router import groupsrouter +from routers.health_router import health_router +from routers.log_router import log_router +from routers.messages_router import messagesrouter + +path_root = Path(__file__).parents[1] +script_path = os.path.join(path_root) +sys.path.append(script_path) + + +from lib import msgs +from dblib import get_db_cursor_and_connection +from store import SELF_URL, BASE_PATH result = get_db_cursor_and_connection() @@ -82,247 +90,16 @@ app.include_router(testrouter, prefix="/test") app.include_router(sendrouter, prefix="/send") app.include_router(groupsrouter, prefix="/groups") +app.include_router(health_router, prefix="/health") +app.include_router(log_router, prefix="/logs") +app.include_router(messagesrouter, prefix="/messages") @app.get("/", tags=["health"]) async def home_api(): return {"message": "Hello World"} -@app.get("/ping", tags=["health"]) -async def ping(): - data = { - "time": datetime.datetime.now().isoformat(), - "timezone": "UTC", - "status": "up" - } - return JSONResponse(content=data) - - -@app.get("/ping/{host}", tags=["health"]) -async def ping_db(host: str): - if host == "phone": - result = is_server_up(SRV_ADDR) - return { "host": SRV_ADDR, "status": "up" if result else "down"} - - -@app.get("/version", tags=["health"]) -async def get_version(): - return { - "handler": HANDLER_VERSION, - "checker": CHECKER_VERSION, - "website": WEBSITE_VERSION, - "reciever": RECIEVER_VERSION - } - -@app.delete('/purge/db', tags=["health"]) -async def purge_database(): - try: - cursor.execute('DELETE FROM "Log"') - cursor.execute('DELETE FROM "Recipient"') - cursor.execute('DELETE FROM "Group"') - cursor.execute('DELETE FROM "RunningJobs"') - connection.commit() - return {"message": "Database Purged", "success": True} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - -@app.delete('/purge/{table}', tags=["health"]) -async def purge_table(table: str): - try: - cursor.execute(f'DELETE FROM %s', (table,)) - connection.commit() - return {"message": f"{table} Purged", "success": True} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - @app.get("/trail-messages") async def get_stopwords(): return msgs - -@app.delete("/cancel", tags=["groups"]) -def delete_recipient_from_group(phone: PhoneRequest, id: int = -1): - query = "" - data = tuple() - if id == -1: - query = 'DELETE FROM "Recipient" WHERE "phone" = (%s)' - data = (phone.phone_number,) - group = get_group_by_id(cursor, id) - if not group: - return {"message": "Invalid ID", "success": False} - else: - query = 'DELETE FROM "Recipient" WHERE "groupId" = (%s) AND "phone" = (%s)' - data = (id, phone.phone_number) - try: - cursor.execute(query, data) - connection.commit() - if id == -1: - return {"message": "Recipient Deleted", "success": True} - else: - return {"message": f"Recipient Deleted from group: {id}", "success": True} - except InvalidTextRepresentation as e: - print(e) - return {"message": "Invalid ID", "success": False} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - -@app.post("/log", tags=["logs"]) -def log_message(request: LogRequest): - level = request.level - message = request.message - tool = request.tool - comment = request.comment - logged_at = request.logged_at - if not level or not message or not tool: - return {"message": "Invalid Request", "success": False} - try: - insert_log(cursor, level, message, tool, comment, logged_at=logged_at) - return {"message": "Log Inserted", "success": True} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - -def convertRequestToLog(request: LogRequest): - return PgLog(request.level, request.message, request.tool, request.comment, request.timezone, request.logged_at) - -def runAsyncLogInsert(c, logs): - print(f"Inserting {len(logs)} logs") - try: - insert_logs(c, logs) - print(f"Inserted {len(logs)} logs") - except Exception as e: - print(e) - -def insert_logs_thread(logs: list[PgLog]): - split_logs = [logs[i:i + 50] for i in range(0, len(logs), 50)] - print(f"Split into {len(split_logs)} batches") - cursors = [get_db_cursor_and_connection()[0] for i in range(len(split_logs))] - print(f"Got {len(cursors)} cursors") - try: - for log_batch in split_logs: - threading.Thread(target=runAsyncLogInsert, args=(cursors.pop(), log_batch)).start() - print(f"Started {len(split_logs)} threads") - except Exception as e: - print(e) - print(f"Error inserting logs: {e}") - -@app.post("/logs", tags=["logs"]) -async def log_multiple_messages(request: LogsRequest): - logs = request.logs - print(f"Received {len(logs)} logs") - pgLogs = [convertRequestToLog(log) for log in logs] - threading.Thread(target=insert_logs_thread, args=(pgLogs,)).start() - return {"message": "Thanks for the logs", "success": True} - -@app.delete("/logs", tags=["logs"]) -async def delete_logs(mode: str): - try: - if mode == "all": - cursor.execute('DELETE FROM "Log"') - connection.commit() - return {"message": "All Logs Deleted", "success": True} - elif mode == "old": - cursor.execute('DELETE FROM "Log" WHERE "timestamp" < now() - interval \'30 days\'') - connection.commit() - return {"message": "Old Logs Deleted", "success": True} - else: - return {"message": "Invalid Mode", "success": False} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - -@app.get("/logs", tags=["logs"]) -async def get_logs(): - try: - cursor.execute('SELECT * FROM "Log"') - result = cursor.fetchall() - return { "logs": result} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - -@app.get("/logs/{level}", tags=["logs"]) -async def get_logs_by_level(level: str): - try: - cursor.execute('SELECT * FROM "Log" WHERE "level" = (%s)', (level,)) - result = cursor.fetchall() - return { "logs": result} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - -@app.get("/logs/{level}/{tool}", tags=["logs"]) -async def get_logs_by_level_and_tool(level: str, tool: str): - try: - cursor.execute('SELECT * FROM "Log" WHERE "level" = (%s) AND "tool" = (%s)', (level, tool)) - result = cursor.fetchall() - return { "logs": result} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - -@app.get("/messages", tags=["messages"]) -async def get_pending_messages(filter: str = "pending"): - query = "" - if filter == "all": - query = 'SELECT * FROM "PendingMessage"' - elif filter == "sent": - query = 'SELECT * FROM "PendingMessage" WHERE "status" = "sent"' - elif filter == "failed": - query = 'SELECT * FROM "PendingMessage" WHERE "status" = "failed"' - elif filter == "pending": - query = 'SELECT * FROM "PendingMessage" WHERE "status" = "pending"' - try: - cursor.execute(query) - result = cursor.fetchall() - return { "messages": result} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - -@app.get("/messages/{id}", tags=["messages"]) -async def get_pending_messages_by_group_id(id: int): - try: - cursor.execute('SELECT * FROM "PendingMessage" WHERE "groupId" = (%s)', (id,)) - result = cursor.fetchall() - return { "messages": result} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - -@app.get("/messages/{id}/recipients", tags=["messages"]) -async def get_pending_messages_recipients(id: int): - try: - cursor.execute('SELECT * FROM "PendingMessage" WHERE "groupId" = (%s)', (id,)) - result = cursor.fetchall() - return { "messages": result} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} - - -@app.put("/messages/{id}", tags=["messages"]) -async def update_pending_message_status(id: int, request: PutPendingMessageRequest): - status = request.status - - if status not in ["pending", "sent", "failed"]: - return {"message": "Invalid Status", "success": False} - - try: - cursor.execute('UPDATE "PendingMessage" SET "status" = (%s) WHERE "id" = (%s)', (status, id)) - connection.commit() - return {"message": "Status Updated", "success": True} - except Exception as e: - print(e) - return {"message": "Error Occurred: " + str(e), "success": False} diff --git a/handler/routers/__init__.py b/handler/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/handler/groups_router.py b/handler/routers/groups_router.py similarity index 50% rename from handler/groups_router.py rename to handler/routers/groups_router.py index fd76737..d3d01dc 100644 --- a/handler/groups_router.py +++ b/handler/routers/groups_router.py @@ -1,14 +1,15 @@ from fastapi import APIRouter from psycopg.errors import InvalidTextRepresentation -from lib import append_cancel_message -from dblib import get_db_cursor_and_connection, get_group_by_id, get_recipients_by_group_id - +from lib import append_cancel_message, PhoneRequest +from dblib import get_db_cursor_and_connection, get_group_by_id, get_recipients_by_group_id, Logger groupsrouter = APIRouter() cursor, connection = get_db_cursor_and_connection() +logger = Logger("HANDLER:GROUPS", cursor) + @groupsrouter.get("/", tags=["groups"]) async def get_groups(): cursor.execute('SELECT * FROM "Group"') @@ -39,3 +40,28 @@ async def get_group_details(id: int): except Exception as e: print(e) return {"message": "Error Occurred: " + str(e)} + + +@groupsrouter.delete("/cancel", tags=["groups"]) +def delete_recipient_from_group(phone: str): + if not phone: + return {"message": "Invalid Phone Number", "success": False} + + try: + existing_recipient = cursor.execute('SELECT * FROM "Recipient" WHERE "phone" = (%s)', (phone,)) + if not existing_recipient: + logger.info(f"Recipient with phone number {phone} not found") + return {"message": "Recipient not found", "success": False} + else: + cursor.execute('DELETE FROM "Recipient" WHERE "phone" = (%s)', (phone,)) + connection.commit() + logger.info(f"Recipient with phone number {phone} deleted") + return {"message": "Recipient Deleted", "success": True} + except InvalidTextRepresentation as e: + print(e) + logger.error(f"Invalid phone number: {phone}") + return {"message": "Invalid ID", "success": False} + except Exception as e: + print(e) + logger.error(f"Error deleting recipient: {phone}") + return {"message": "Error Occurred: " + str(e), "success": False} diff --git a/handler/routers/health_router.py b/handler/routers/health_router.py new file mode 100644 index 0000000..b42e89f --- /dev/null +++ b/handler/routers/health_router.py @@ -0,0 +1,60 @@ +from fastapi import APIRouter +from dblib import is_server_up, get_db_cursor_and_connection +from fastapi.responses import JSONResponse +import datetime +from store import SRV_ADDR, HANDLER_VERSION, CHECKER_VERSION, WEBSITE_VERSION, RECIEVER_VERSION + +health_router = APIRouter() + +cursor, connection = get_db_cursor_and_connection() + +@health_router.get("/ping", tags=["health"]) +async def ping(): + data = { + "time": datetime.datetime.now().isoformat(), + "timezone": "UTC", + "status": "up" + } + return JSONResponse(content=data) + + +@health_router.get("/ping/{host}", tags=["health"]) +async def ping_db(host: str): + if host == "phone": + result = is_server_up(SRV_ADDR) + return { "host": SRV_ADDR, "status": "up" if result else "down"} + + +@health_router.get("/version", tags=["health"]) +async def get_version(): + return { + "handler": HANDLER_VERSION, + "checker": CHECKER_VERSION, + "website": WEBSITE_VERSION, + "reciever": RECIEVER_VERSION + } + + + +@health_router.delete('/purge/db', tags=["health"]) +async def purge_database(): + try: + cursor.execute('DELETE FROM "Log"') + cursor.execute('DELETE FROM "Recipient"') + cursor.execute('DELETE FROM "Group"') + cursor.execute('DELETE FROM "RunningJobs"') + connection.commit() + return {"message": "Database Purged", "success": True} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} + +@health_router.delete('/purge/{table}', tags=["health"]) +async def purge_table(table: str): + try: + cursor.execute(f'DELETE FROM %s', (table,)) + connection.commit() + return {"message": f"{table} Purged", "success": True} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} diff --git a/handler/routers/log_router.py b/handler/routers/log_router.py new file mode 100644 index 0000000..b324e47 --- /dev/null +++ b/handler/routers/log_router.py @@ -0,0 +1,105 @@ +from fastapi import APIRouter +from dblib import get_db_cursor_and_connection, insert_log, insert_logs, PgLog, get_db_cursor_and_connection +from lib import LogRequest, LogsRequest +import threading + +log_router = APIRouter() +cursor, connection = get_db_cursor_and_connection() + +@log_router.post("/log", tags=["logs"]) +def log_message(request: LogRequest): + level = request.level + message = request.message + tool = request.tool + comment = request.comment + logged_at = request.logged_at + if not level or not message or not tool: + return {"message": "Invalid Request", "success": False} + try: + insert_log(cursor, level, message, tool, comment, logged_at=logged_at) + return {"message": "Log Inserted", "success": True} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} + +def convertRequestToLog(request: LogRequest): + return PgLog(request.level, request.message, request.tool, request.comment, request.timezone, request.logged_at) + +def runAsyncLogInsert(c, logs): + print(f"Inserting {len(logs)} logs") + try: + insert_logs(c, logs) + print(f"Inserted {len(logs)} logs") + except Exception as e: + print(e) + +def insert_logs_thread(logs: list[PgLog]): + split_logs = [logs[i:i + 50] for i in range(0, len(logs), 50)] + print(f"Split into {len(split_logs)} batches") + cursors = [get_db_cursor_and_connection()[0] for i in range(len(split_logs))] + print(f"Got {len(cursors)} cursors") + try: + for log_batch in split_logs: + threading.Thread(target=runAsyncLogInsert, args=(cursors.pop(), log_batch)).start() + print(f"Started {len(split_logs)} threads") + except Exception as e: + print(e) + print(f"Error inserting logs: {e}") + +@log_router.post("/logs", tags=["logs"]) +async def log_multiple_messages(request: LogsRequest): + logs = request.logs + print(f"Received {len(logs)} logs") + pgLogs = [convertRequestToLog(log) for log in logs] + threading.Thread(target=insert_logs_thread, args=(pgLogs,)).start() + return {"message": "Thanks for the logs", "success": True} + +@log_router.delete("/logs", tags=["logs"]) +async def delete_logs(mode: str): + try: + if mode == "all": + cursor.execute('DELETE FROM "Log"') + connection.commit() + return {"message": "All Logs Deleted", "success": True} + elif mode == "old": + cursor.execute('DELETE FROM "Log" WHERE "timestamp" < now() - interval \'30 days\'') + connection.commit() + return {"message": "Old Logs Deleted", "success": True} + else: + return {"message": "Invalid Mode", "success": False} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} + + +@log_router.get("/logs", tags=["logs"]) +async def get_logs(): + try: + cursor.execute('SELECT * FROM "Log"') + result = cursor.fetchall() + return { "logs": result} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} + + +@log_router.get("/logs/{level}", tags=["logs"]) +async def get_logs_by_level(level: str): + try: + cursor.execute('SELECT * FROM "Log" WHERE "level" = (%s)', (level,)) + result = cursor.fetchall() + return { "logs": result} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} + + +@log_router.get("/logs/{level}/{tool}", tags=["logs"]) +async def get_logs_by_level_and_tool(level: str, tool: str): + try: + cursor.execute('SELECT * FROM "Log" WHERE "level" = (%s) AND "tool" = (%s)', (level, tool)) + result = cursor.fetchall() + return { "logs": result} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} diff --git a/handler/routers/messages_router.py b/handler/routers/messages_router.py new file mode 100644 index 0000000..e8db499 --- /dev/null +++ b/handler/routers/messages_router.py @@ -0,0 +1,65 @@ + +from fastapi import APIRouter +from lib import PutPendingMessageRequest +from dblib import get_db_cursor_and_connection + +messagesrouter = APIRouter() + +cursor, connection = get_db_cursor_and_connection() + +@messagesrouter.get("/", tags=["messages"]) +async def get_pending_messages(filter: str = "pending"): + query = "" + if filter == "all": + query = 'SELECT * FROM "PendingMessage"' + elif filter == "sent": + query = 'SELECT * FROM "PendingMessage" WHERE "status" = "sent"' + elif filter == "failed": + query = 'SELECT * FROM "PendingMessage" WHERE "status" = "failed"' + elif filter == "pending": + query = 'SELECT * FROM "PendingMessage" WHERE "status" = "pending"' + try: + cursor.execute(query) + result = cursor.fetchall() + return { "messages": result} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} + + +@messagesrouter.get("/{id}", tags=["messages"]) +async def get_pending_messages_by_group_id(id: int): + try: + cursor.execute('SELECT * FROM "PendingMessage" WHERE "groupId" = (%s)', (id,)) + result = cursor.fetchall() + return { "messages": result} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} + + +@messagesrouter.get("/{id}/recipients", tags=["messages"]) +async def get_pending_messages_recipients(id: int): + try: + cursor.execute('SELECT * FROM "PendingMessage" WHERE "groupId" = (%s)', (id,)) + result = cursor.fetchall() + return { "messages": result} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} + + +@messagesrouter.put("/{id}", tags=["messages"]) +async def update_pending_message_status(id: int, request: PutPendingMessageRequest): + status = request.status + + if status not in ["pending", "sent", "failed"]: + return {"message": "Invalid Status", "success": False} + + try: + cursor.execute('UPDATE "PendingMessage" SET "status" = (%s) WHERE "id" = (%s)', (status, id)) + connection.commit() + return {"message": "Status Updated", "success": True} + except Exception as e: + print(e) + return {"message": "Error Occurred: " + str(e), "success": False} diff --git a/handler/send_router.py b/handler/routers/send_router.py similarity index 95% rename from handler/send_router.py rename to handler/routers/send_router.py index aa6ddae..2e9af1b 100644 --- a/handler/send_router.py +++ b/handler/routers/send_router.py @@ -1,6 +1,6 @@ from psycopg.errors import InvalidTextRepresentation from lib import IdRequest, broadcast_bulk_send, send_message -from dblib import get_group_by_id, get_recipients_by_group_id, is_server_up, get_db_cursor_and_connection +from dblib import get_group_by_id, get_recipients_by_group_id, is_server_up, get_db_cursor_and_connection, Logger from store import SRV_ADDR from fastapi import APIRouter @@ -8,6 +8,7 @@ cursor, connection = get_db_cursor_and_connection() +logger = Logger("HANDLER:SEND", cursor) @sendrouter.post("/", tags=["send"]) async def send_msg_to_recipients(request: IdRequest): diff --git a/handler/test_router.py b/handler/routers/test_router.py similarity index 100% rename from handler/test_router.py rename to handler/routers/test_router.py diff --git a/handler/store.py b/handler/store.py index f3ab963..3ef85f9 100644 --- a/handler/store.py +++ b/handler/store.py @@ -9,6 +9,7 @@ WEBSITE_VERSION = os.getenv("WEBSITE_VERSION") RECIEVER_VERSION = os.getenv("RECIEVER_VERSION") SELF_URL = os.getenv("SELF_URL") +BASE_PATH = os.getenv("BASE_PATH") or "" PHONE_SERVER_IP = os.getenv("PHONE_SERVER_IP") PHONE_SERVER_PORT = os.getenv("PHONE_SERVER_PORT")