From f26e87e6df453f6358365a9c496b55871d52d65b Mon Sep 17 00:00:00 2001 From: lichifeng Date: Wed, 24 Apr 2024 22:10:20 +0800 Subject: [PATCH] Refactor file_processor.py and related modules --- mgxhub/processor/file_processor.py | 8 +------- mgxhub/processor/game2sqlite.py | 17 ++++++++++------- mgxhub/processor/proc_record.py | 5 +---- mgxhub/watcher/watcher.py | 5 +---- webapi/routers/game_reparse.py | 11 +++++------ webapi/routers/game_upload.py | 6 +----- 6 files changed, 19 insertions(+), 33 deletions(-) diff --git a/mgxhub/processor/file_processor.py b/mgxhub/processor/file_processor.py index 015d7a7..6df92f5 100644 --- a/mgxhub/processor/file_processor.py +++ b/mgxhub/processor/file_processor.py @@ -7,8 +7,6 @@ import string from datetime import datetime -from sqlalchemy.orm import Session - from mgxhub import cfg, logger from .allowed_types import ACCEPTED_COMPRESSED_TYPES, ACCEPTED_RECORD_TYPES @@ -47,11 +45,9 @@ class FileProcessor: _cleanup: bool = False _tmpdir: str = None _output: dict = None - _session: Session = None def __init__( self, - session: Session, src: str | io.StringIO | io.BytesIO | io.TextIOWrapper, syncproc: bool = True, s3replace: bool = False, @@ -67,7 +63,6 @@ def __init__( raise ValueError('Buffer meta info required for buffer input.') self._filepath = self._save_buffer(src, *buffermeta) - self._session = session self._syncproc = syncproc self._cleanup = cleanup self._s3replace = s3replace @@ -110,8 +105,7 @@ def _process(self) -> dict: fileext = self._filepath.split('.')[-1].lower() if fileext in ACCEPTED_RECORD_TYPES: logger.debug(f'Proc(record): {self._filepath}') - self._output = process_record(self._session, self._filepath, self._syncproc, - '-b', self._s3replace, self._cleanup) + self._output = process_record(self._filepath, self._syncproc, '-b', self._s3replace, self._cleanup) elif fileext in ACCEPTED_COMPRESSED_TYPES: logger.debug(f'Proc(compressed): {self._filepath}') self._output = process_compressed(self._filepath, self._cleanup) diff --git a/mgxhub/processor/game2sqlite.py b/mgxhub/processor/game2sqlite.py index 1a8ccc7..79c04b7 100644 --- a/mgxhub/processor/game2sqlite.py +++ b/mgxhub/processor/game2sqlite.py @@ -3,14 +3,14 @@ import traceback from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import Session from mgxhub import logger +from mgxhub.db import db_raw from mgxhub.db.operation import add_game from mgxhub.rating import RatingLock -def save_game_sqlite(session: Session, data: dict, retries: int = 3) -> tuple[str, str]: +def save_game_sqlite(data: dict, retries: int = 3) -> tuple[str, str]: '''Save game data to SQLite database. **Only handles data. Need to clean the file depends on returned status.** @@ -26,28 +26,31 @@ def save_game_sqlite(session: Session, data: dict, retries: int = 3) -> tuple[st ''' try: - result = add_game(session, data) + db = db_raw() + result = add_game(db, data) logger.info(f'Game added: {result}') if result[0] in ['success', 'updated']: RatingLock().start_calc(schedule=True) except IntegrityError: if retries > 0: - session.rollback() # 回滚事务 - return save_game_sqlite(session, data, retries - 1) # 重新调用自身,减少重试次数 + db.rollback() # 回滚事务 + return save_game_sqlite(data, retries - 1) # 重新调用自身,减少重试次数 else: result = 'error', data.get('guid', 'unknown guid') except Exception as e: tb = traceback.format_exc() # 获取异常的详细信息 logger.error(f'game2sqlite error: {e}\n{tb}') # 记录异常的详细信息 result = 'error', data.get('guid', 'unknown guid') + finally: + db.close() return result -async def async_save_game_sqlite(session: Session, data: dict) -> tuple[str, str]: +async def async_save_game_sqlite(data: dict) -> tuple[str, str]: '''Async version of save_game_sqlite. **Only handles data. Need to clean the file depends on returned status.** ''' - return save_game_sqlite(session, data) + return save_game_sqlite(data) diff --git a/mgxhub/processor/proc_record.py b/mgxhub/processor/proc_record.py index c9a74fb..2adc46a 100644 --- a/mgxhub/processor/proc_record.py +++ b/mgxhub/processor/proc_record.py @@ -3,8 +3,6 @@ import os import threading -from sqlalchemy.orm import Session - from mgxhub import logger from mgxhub.parser import parse from mgxhub.util import run_slow_tasks @@ -17,7 +15,6 @@ def process_record( - session: Session, recpath: str, waitio: bool = False, opts: str = '', @@ -62,7 +59,7 @@ def process_record( # Do upload, db insert, etc. tasks = [] - tasks.append(async_save_game_sqlite(session, parsed_result)) + tasks.append(async_save_game_sqlite(parsed_result)) tasks.append(async_save_to_s3(recpath, parsed_result, s3replace, cleanup)) if 'map' in parsed_result and 'base64' in parsed_result['map']: tasks.append(async_save_map(parsed_result['guid'], parsed_result['map']['base64'])) diff --git a/mgxhub/watcher/watcher.py b/mgxhub/watcher/watcher.py index ba59d1b..0f719ed 100644 --- a/mgxhub/watcher/watcher.py +++ b/mgxhub/watcher/watcher.py @@ -25,7 +25,6 @@ from concurrent.futures import ThreadPoolExecutor from mgxhub import cfg, logger, proc_queue -from mgxhub.db import db_raw from mgxhub.processor import FileProcessor from .scanner import scan @@ -79,9 +78,8 @@ def _watch(self): def _process_file(self, file_path): '''Process the file''' - session = db_raw() try: - file_processor = FileProcessor(session, file_path, syncproc=True, s3replace=False, cleanup=True) + file_processor = FileProcessor(file_path, syncproc=True, s3replace=False, cleanup=True) logger.debug(f"[Watcher] {file_path}: {file_processor.result().get('status', 'unknown')}") if os.path.isfile(file_path): os.remove(file_path) @@ -97,4 +95,3 @@ def _process_file(self, file_path): logger.error(f"[Watcher] Error [{file_path}]: {e}") finally: self.q.task_done() - session.close() diff --git a/webapi/routers/game_reparse.py b/webapi/routers/game_reparse.py index d6302f0..52051ac 100644 --- a/webapi/routers/game_reparse.py +++ b/webapi/routers/game_reparse.py @@ -4,7 +4,6 @@ from fastapi import BackgroundTasks from fastapi.responses import JSONResponse -from sqlalchemy.orm import Session from mgxhub import cfg from mgxhub.db import db_raw @@ -14,15 +13,16 @@ from webapi.admin_api import admin_api -def _reparse(session: Session, guid: str) -> None: +def _reparse(guid: str) -> None: oss = S3Adapter(**cfg.s3) - file_records = session.query(File.md5).filter(File.game_guid == guid).all() + db = db_raw() + file_records = db.query(File.md5).filter(File.game_guid == guid).all() file_md5s = [f[0] for f in file_records] + db.close() for filemd5 in file_md5s: downloaded = oss.download(f"/records/{filemd5}.zip") if downloaded: FileProcessor( - session, downloaded, syncproc=True, s3replace=False, @@ -42,7 +42,6 @@ async def reparse_a_record(background_tasks: BackgroundTasks, guid: str) -> dict Defined in: `webapi/routers/game_reparse.py` ''' - session = db_raw() - background_tasks.add_task(_reparse, session, guid) + background_tasks.add_task(_reparse, guid) return JSONResponse(status_code=202, content={"detail": f"Reparse command sent for [{guid}]"}) diff --git a/webapi/routers/game_upload.py b/webapi/routers/game_upload.py index 72667ad..ec88df6 100644 --- a/webapi/routers/game_upload.py +++ b/webapi/routers/game_upload.py @@ -4,11 +4,9 @@ from fastapi import Depends, File, Form, UploadFile from fastapi.security import HTTPBasicCredentials -from sqlalchemy.orm import Session from mgxhub import logger from mgxhub.auth import WPRestAPI -from mgxhub.db import db_dep from mgxhub.processor import FileProcessor from webapi import app from webapi.authdepends import security @@ -20,8 +18,7 @@ async def upload_a_record( lastmod: str = Form(''), s3replace: bool = Form(False), cleanup: bool = Form(True), - creds: HTTPBasicCredentials = Depends(security), - db: Session = Depends(db_dep) + creds: HTTPBasicCredentials = Depends(security) ): '''Upload a record file to the server. @@ -50,7 +47,6 @@ async def upload_a_record( lastmod = datetime.now().isoformat() processed = FileProcessor( - db, recfile.file, syncproc=False, s3replace=s3replace,