Skip to content

Commit

Permalink
fixed db session commit confilict bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
lichifeng committed Apr 11, 2024
1 parent 97c7100 commit 993b8f7
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
2 changes: 1 addition & 1 deletion mgxhub/db/sqlite3.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def prepare(self, db_path: str | None = None) -> None:

self._db_engine = create_engine(
f"sqlite:///{self._db_path}",
connect_args={"check_same_thread": False},
connect_args={"check_same_thread": False, "timeout": 60},
echo=(cfg.get('system', 'echosql').lower() == 'on')
)
Base.metadata.create_all(self._db_engine)
Expand Down
19 changes: 12 additions & 7 deletions mgxhub/rating/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,22 @@ def main(db_path: str, duration_threshold: str, batch_size: str):
start_time = time.time()

try:
engine = create_engine(f"sqlite:///{db_path}", echo=False)
engine = create_engine(f"sqlite:///{db_path}", echo=False, connect_args={'timeout': 60})
session = Session(engine)
elo = EloCalculator(session)
duration_threshold = int(duration_threshold)
batch_size = int(batch_size)
if duration_threshold > 0 and batch_size > 50000:
elo.update_ratings(duration_threshold, batch_size)
else:
elo.update_ratings(15 * 60 * 1000, 150000)
session.close()
engine.dispose()
try:
if duration_threshold > 0 and batch_size > 50000:
elo.update_ratings(duration_threshold, batch_size)
else:
elo.update_ratings(15 * 60 * 1000, 150000)
except Exception as e:
logger.error(f"Rating Error: {e}")
session.rollback()
finally:
session.close()
engine.dispose()
finally:
if os.path.exists(LOCK_FILE):
os.remove(LOCK_FILE)
Expand Down
26 changes: 12 additions & 14 deletions mgxhub/watcher/watcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'''Used to watch the work directory for new files and process them'''

import atexit
import fcntl
import os
import threading
import time
Expand All @@ -20,18 +21,14 @@ def __init__(self, max_workers=2):
'''Initialize the watcher'''

self.lock_file = "/tmp/mgxhub_record_watcher.lock"
if os.path.exists(self.lock_file):
with open(self.lock_file, "r", encoding="ascii") as file:
pid = int(file.read())
try:
os.kill(pid, 0)
except OSError: # No such process
os.remove(self.lock_file)
else: # Process exists
return

with open(self.lock_file, "w", encoding="ascii") as file:
file.write(str(os.getpid()))
self.file = open(self.lock_file, 'w', encoding='ascii')

try:
fcntl.flock(self.file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
# another instance is running
print("Another watcher instance is running")
return

atexit.register(self._remove_lock_file)

Expand All @@ -47,8 +44,9 @@ def __init__(self, max_workers=2):

def _remove_lock_file(self):
'''Remove the lock file'''
if os.path.exists(self.lock_file):
os.remove(self.lock_file)

fcntl.flock(self.file, fcntl.LOCK_UN)
self.file.close()

def _watch(self):
'''Watch the work directory for new files and process them'''
Expand Down

0 comments on commit 993b8f7

Please sign in to comment.