Skip to content

Commit

Permalink
Refactor file_processor.py and related modules
Browse files Browse the repository at this point in the history
  • Loading branch information
lichifeng committed Apr 24, 2024
1 parent 6eb3b9a commit f26e87e
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 33 deletions.
8 changes: 1 addition & 7 deletions mgxhub/processor/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import string
from datetime import datetime

from sqlalchemy.orm import Session

from mgxhub import cfg, logger

from .allowed_types import ACCEPTED_COMPRESSED_TYPES, ACCEPTED_RECORD_TYPES
Expand Down Expand Up @@ -47,11 +45,9 @@ class FileProcessor:
_cleanup: bool = False
_tmpdir: str = None
_output: dict = None
_session: Session = None

def __init__(
self,
session: Session,
src: str | io.StringIO | io.BytesIO | io.TextIOWrapper,
syncproc: bool = True,
s3replace: bool = False,
Expand All @@ -67,7 +63,6 @@ def __init__(
raise ValueError('Buffer meta info required for buffer input.')
self._filepath = self._save_buffer(src, *buffermeta)

self._session = session
self._syncproc = syncproc
self._cleanup = cleanup
self._s3replace = s3replace
Expand Down Expand Up @@ -110,8 +105,7 @@ def _process(self) -> dict:
fileext = self._filepath.split('.')[-1].lower()
if fileext in ACCEPTED_RECORD_TYPES:
logger.debug(f'Proc(record): {self._filepath}')
self._output = process_record(self._session, self._filepath, self._syncproc,
'-b', self._s3replace, self._cleanup)
self._output = process_record(self._filepath, self._syncproc, '-b', self._s3replace, self._cleanup)
elif fileext in ACCEPTED_COMPRESSED_TYPES:
logger.debug(f'Proc(compressed): {self._filepath}')
self._output = process_compressed(self._filepath, self._cleanup)
Expand Down
17 changes: 10 additions & 7 deletions mgxhub/processor/game2sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import traceback

from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

from mgxhub import logger
from mgxhub.db import db_raw
from mgxhub.db.operation import add_game
from mgxhub.rating import RatingLock


def save_game_sqlite(session: Session, data: dict, retries: int = 3) -> tuple[str, str]:
def save_game_sqlite(data: dict, retries: int = 3) -> tuple[str, str]:
'''Save game data to SQLite database.
**Only handles data. Need to clean the file depends on returned status.**
Expand All @@ -26,28 +26,31 @@ def save_game_sqlite(session: Session, data: dict, retries: int = 3) -> tuple[st
'''

try:
result = add_game(session, data)
db = db_raw()
result = add_game(db, data)
logger.info(f'Game added: {result}')
if result[0] in ['success', 'updated']:
RatingLock().start_calc(schedule=True)
except IntegrityError:
if retries > 0:
session.rollback() # 回滚事务
return save_game_sqlite(session, data, retries - 1) # 重新调用自身,减少重试次数
db.rollback() # 回滚事务
return save_game_sqlite(data, retries - 1) # 重新调用自身,减少重试次数
else:
result = 'error', data.get('guid', 'unknown guid')
except Exception as e:
tb = traceback.format_exc() # 获取异常的详细信息
logger.error(f'game2sqlite error: {e}\n{tb}') # 记录异常的详细信息
result = 'error', data.get('guid', 'unknown guid')
finally:
db.close()

return result


async def async_save_game_sqlite(session: Session, data: dict) -> tuple[str, str]:
async def async_save_game_sqlite(data: dict) -> tuple[str, str]:
'''Async version of save_game_sqlite.
**Only handles data. Need to clean the file depends on returned status.**
'''

return save_game_sqlite(session, data)
return save_game_sqlite(data)
5 changes: 1 addition & 4 deletions mgxhub/processor/proc_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import os
import threading

from sqlalchemy.orm import Session

from mgxhub import logger
from mgxhub.parser import parse
from mgxhub.util import run_slow_tasks
Expand All @@ -17,7 +15,6 @@


def process_record(
session: Session,
recpath: str,
waitio: bool = False,
opts: str = '',
Expand Down Expand Up @@ -62,7 +59,7 @@ def process_record(

# Do upload, db insert, etc.
tasks = []
tasks.append(async_save_game_sqlite(session, parsed_result))
tasks.append(async_save_game_sqlite(parsed_result))
tasks.append(async_save_to_s3(recpath, parsed_result, s3replace, cleanup))
if 'map' in parsed_result and 'base64' in parsed_result['map']:
tasks.append(async_save_map(parsed_result['guid'], parsed_result['map']['base64']))
Expand Down
5 changes: 1 addition & 4 deletions mgxhub/watcher/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from concurrent.futures import ThreadPoolExecutor

from mgxhub import cfg, logger, proc_queue
from mgxhub.db import db_raw
from mgxhub.processor import FileProcessor

from .scanner import scan
Expand Down Expand Up @@ -79,9 +78,8 @@ def _watch(self):
def _process_file(self, file_path):
'''Process the file'''

session = db_raw()
try:
file_processor = FileProcessor(session, file_path, syncproc=True, s3replace=False, cleanup=True)
file_processor = FileProcessor(file_path, syncproc=True, s3replace=False, cleanup=True)
logger.debug(f"[Watcher] {file_path}: {file_processor.result().get('status', 'unknown')}")
if os.path.isfile(file_path):
os.remove(file_path)
Expand All @@ -97,4 +95,3 @@ def _process_file(self, file_path):
logger.error(f"[Watcher] Error [{file_path}]: {e}")
finally:
self.q.task_done()
session.close()
11 changes: 5 additions & 6 deletions webapi/routers/game_reparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from fastapi import BackgroundTasks
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session

from mgxhub import cfg
from mgxhub.db import db_raw
Expand All @@ -14,15 +13,16 @@
from webapi.admin_api import admin_api


def _reparse(session: Session, guid: str) -> None:
def _reparse(guid: str) -> None:
oss = S3Adapter(**cfg.s3)
file_records = session.query(File.md5).filter(File.game_guid == guid).all()
db = db_raw()
file_records = db.query(File.md5).filter(File.game_guid == guid).all()
file_md5s = [f[0] for f in file_records]
db.close()
for filemd5 in file_md5s:
downloaded = oss.download(f"/records/{filemd5}.zip")
if downloaded:
FileProcessor(
session,
downloaded,
syncproc=True,
s3replace=False,
Expand All @@ -42,7 +42,6 @@ async def reparse_a_record(background_tasks: BackgroundTasks, guid: str) -> dict
Defined in: `webapi/routers/game_reparse.py`
'''

session = db_raw()
background_tasks.add_task(_reparse, session, guid)
background_tasks.add_task(_reparse, guid)

return JSONResponse(status_code=202, content={"detail": f"Reparse command sent for [{guid}]"})
6 changes: 1 addition & 5 deletions webapi/routers/game_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

from fastapi import Depends, File, Form, UploadFile
from fastapi.security import HTTPBasicCredentials
from sqlalchemy.orm import Session

from mgxhub import logger
from mgxhub.auth import WPRestAPI
from mgxhub.db import db_dep
from mgxhub.processor import FileProcessor
from webapi import app
from webapi.authdepends import security
Expand All @@ -20,8 +18,7 @@ async def upload_a_record(
lastmod: str = Form(''),
s3replace: bool = Form(False),
cleanup: bool = Form(True),
creds: HTTPBasicCredentials = Depends(security),
db: Session = Depends(db_dep)
creds: HTTPBasicCredentials = Depends(security)
):
'''Upload a record file to the server.
Expand Down Expand Up @@ -50,7 +47,6 @@ async def upload_a_record(
lastmod = datetime.now().isoformat()

processed = FileProcessor(
db,
recfile.file,
syncproc=False,
s3replace=s3replace,
Expand Down

0 comments on commit f26e87e

Please sign in to comment.