From 993b8f7f37f9746d394b3ebdb0954642410387f4 Mon Sep 17 00:00:00 2001 From: lichifeng Date: Thu, 11 Apr 2024 13:41:39 +0800 Subject: [PATCH] fixed db session commit confilict bugs --- mgxhub/db/sqlite3.py | 2 +- mgxhub/rating/__main__.py | 19 ++++++++++++------- mgxhub/watcher/watcher.py | 26 ++++++++++++-------------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/mgxhub/db/sqlite3.py b/mgxhub/db/sqlite3.py index 26992ec..c2992a8 100644 --- a/mgxhub/db/sqlite3.py +++ b/mgxhub/db/sqlite3.py @@ -65,7 +65,7 @@ def prepare(self, db_path: str | None = None) -> None: self._db_engine = create_engine( f"sqlite:///{self._db_path}", - connect_args={"check_same_thread": False}, + connect_args={"check_same_thread": False, "timeout": 60}, echo=(cfg.get('system', 'echosql').lower() == 'on') ) Base.metadata.create_all(self._db_engine) diff --git a/mgxhub/rating/__main__.py b/mgxhub/rating/__main__.py index 8f9e67d..b84368a 100644 --- a/mgxhub/rating/__main__.py +++ b/mgxhub/rating/__main__.py @@ -53,17 +53,22 @@ def main(db_path: str, duration_threshold: str, batch_size: str): start_time = time.time() try: - engine = create_engine(f"sqlite:///{db_path}", echo=False) + engine = create_engine(f"sqlite:///{db_path}", echo=False, connect_args={'timeout': 60}) session = Session(engine) elo = EloCalculator(session) duration_threshold = int(duration_threshold) batch_size = int(batch_size) - if duration_threshold > 0 and batch_size > 50000: - elo.update_ratings(duration_threshold, batch_size) - else: - elo.update_ratings(15 * 60 * 1000, 150000) - session.close() - engine.dispose() + try: + if duration_threshold > 0 and batch_size > 50000: + elo.update_ratings(duration_threshold, batch_size) + else: + elo.update_ratings(15 * 60 * 1000, 150000) + except Exception as e: + logger.error(f"Rating Error: {e}") + session.rollback() + finally: + session.close() + engine.dispose() finally: if os.path.exists(LOCK_FILE): os.remove(LOCK_FILE) diff --git a/mgxhub/watcher/watcher.py b/mgxhub/watcher/watcher.py index 51171d3..9920888 100644 --- a/mgxhub/watcher/watcher.py +++ b/mgxhub/watcher/watcher.py @@ -1,6 +1,7 @@ '''Used to watch the work directory for new files and process them''' import atexit +import fcntl import os import threading import time @@ -20,18 +21,14 @@ def __init__(self, max_workers=2): '''Initialize the watcher''' self.lock_file = "/tmp/mgxhub_record_watcher.lock" - if os.path.exists(self.lock_file): - with open(self.lock_file, "r", encoding="ascii") as file: - pid = int(file.read()) - try: - os.kill(pid, 0) - except OSError: # No such process - os.remove(self.lock_file) - else: # Process exists - return - - with open(self.lock_file, "w", encoding="ascii") as file: - file.write(str(os.getpid())) + self.file = open(self.lock_file, 'w', encoding='ascii') + + try: + fcntl.flock(self.file, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError: + # another instance is running + print("Another watcher instance is running") + return atexit.register(self._remove_lock_file) @@ -47,8 +44,9 @@ def __init__(self, max_workers=2): def _remove_lock_file(self): '''Remove the lock file''' - if os.path.exists(self.lock_file): - os.remove(self.lock_file) + + fcntl.flock(self.file, fcntl.LOCK_UN) + self.file.close() def _watch(self): '''Watch the work directory for new files and process them'''