Skip to content

Commit

Permalink
directory watcher was replaced by a queue
Browse files Browse the repository at this point in the history
  • Loading branch information
lichifeng committed Apr 20, 2024
1 parent 290ca81 commit d1dadc1
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 36 deletions.
6 changes: 5 additions & 1 deletion mgxhub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
'''mgxhub main source code'''

import queue

from .config import cfg
from .logger import logger
from .db import db
from .logger import logger

proc_queue = queue.Queue()
1 change: 0 additions & 1 deletion mgxhub/db/operation/get_player_rating.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from sqlalchemy import asc, desc, func, select
from sqlalchemy.orm import Session

from mgxhub import db
from mgxhub.model.orm import Rating

# pylint: disable=not-callable
Expand Down
4 changes: 2 additions & 2 deletions mgxhub/db/sqlite3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker

from mgxhub import cfg, logger
from mgxhub import cfg
from mgxhub.model.orm import Base
from mgxhub.singleton import Singleton

Expand Down Expand Up @@ -70,4 +70,4 @@ def prepare(self, db_path: str | None = None) -> None:
)
Base.metadata.create_all(self._db_engine)
self._db_sessionlocal = sessionmaker(autocommit=False, autoflush=False, bind=self._db_engine)
logger.debug(f"SQLite prepared: {self._db_path}")
print(f"SQLite prepared: {self._db_path}")
7 changes: 3 additions & 4 deletions mgxhub/processor/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,11 @@ def _process(self) -> dict:
logger.debug(f'Proc(record): {self._filepath}')
self._output = process_record(self._session, self._filepath, self._syncproc,
'-b', self._s3replace, self._cleanup)

if fileext in ACCEPTED_COMPRESSED_TYPES:
elif fileext in ACCEPTED_COMPRESSED_TYPES:
logger.debug(f'Proc(compressed): {self._filepath}')
self._output = process_compressed(self._filepath, self._cleanup)

return {'status': 'invalid', 'message': 'unsupported file type'}
else:
self._output = {'status': 'invalid', 'message': 'unsupported file type'}

def result(self) -> dict:
'''Return the processing result.'''
Expand Down
2 changes: 2 additions & 0 deletions mgxhub/processor/proc_compressed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import patoolib

from mgxhub import cfg, logger
from mgxhub.watcher.scanner import scan

from .allowed_types import ACCEPTED_COMPRESSED_TYPES
from .move2error import move_to_error
Expand All @@ -16,6 +17,7 @@ def _decompress(filepath: str, cleanup: bool = True) -> True:
with tempfile.TemporaryDirectory(prefix='unzip_', dir=cfg.get('system', 'uploaddir'), delete=False) as temp_dir:
try:
patoolib.extract_archive(filepath, outdir=temp_dir, interactive=False, verbosity=-1)
scan(temp_dir)
if cleanup and os.path.exists(filepath):
os.remove(filepath)
return True
Expand Down
26 changes: 26 additions & 0 deletions mgxhub/watcher/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'''Scan a directory in upload dir and put file to process queue'''

import os

from mgxhub import proc_queue


def scan(dirpath: str):
'''Scan a directory in upload dir and put file to process queue
Args:
dirpath (str): The directory path to scan.
'''

for root, dirs, files in os.walk(dirpath, topdown=False):
for filename in files:
file_path = os.path.join(root, filename)
proc_queue.put(file_path) # Processor will tried to remove empty parent directory.
for dir in dirs:
# Try remove the directory if it is empty, this works because topdown=False
current_dir_path = os.path.join(root, dir)
if os.path.isdir(current_dir_path) and not os.listdir(current_dir_path):
try:
os.rmdir(current_dir_path)
except OSError:
pass
65 changes: 37 additions & 28 deletions mgxhub/watcher/watcher.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,37 @@
'''Used to watch the work directory for new files and process them'''
'''Used to watch the queue and process tasks in it.
I tried to monitor a directory for new files and process them. But there's a
problem with the the solution. When a file is being written to the directory,
the watcher will try to process it. This will cause an error because the file is
not yet complete. Adding a delay before processing the file will make the
process slow. Solutions like **pyinotify** or **watchdog** makes the process
complex.
**Watcher** is hence not a good idea I think.
My solution is limiting the file input to an API endpoint, upload files with
SFTP or other methods are not considered. When a file is uploaded, the API will
add a task to a queue. Workers will then process the tasks in the queue.
Under this design, files in upload dir should only from _decompress() of
`proc_compressed.py`.
'''

import atexit
import fcntl
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue

from mgxhub.config import cfg
from mgxhub.db import db
from mgxhub.logger import logger
from mgxhub import cfg, db, logger, proc_queue
from mgxhub.processor import FileProcessor

from .scanner import scan


class RecordWatcher:
'''Watches the work directory for new files and processes them'''
'''Watches the queue and process tasks in it.'''

def __init__(self, max_workers=4):
'''Initialize the watcher'''
Expand All @@ -35,12 +51,14 @@ def __init__(self, max_workers=4):
self.work_dir = cfg.get('system', 'uploaddir')
os.makedirs(self.work_dir, exist_ok=True)

scan(self.work_dir)

if self.work_dir and os.path.isdir(self.work_dir):
self.file_queue = Queue()
self.q = proc_queue
self.max_workers = max_workers
self.thread = threading.Thread(target=self._watch, daemon=True)
self.thread.start()
logger.info(f"[Watcher] Monitoring directory: {self.work_dir}")
logger.info("[Watcher] Monitoring queue...")

def _remove_lock_file(self):
'''Remove the lock file'''
Expand All @@ -52,10 +70,9 @@ def _watch(self):
'''Watch the work directory for new files and process them'''
while True:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
self._scan(self.work_dir)
while not self.file_queue.empty():
file_path = self.file_queue.get()
executor.submit(self._process_file, file_path)
while not self.q.empty():
task_path = self.q.get()
executor.submit(self._process_file, task_path)
time.sleep(1)

def _process_file(self, file_path):
Expand All @@ -67,24 +84,16 @@ def _process_file(self, file_path):
logger.debug(f"[Watcher] {file_path}: {file_processor.result().get('status', 'unknown')}")
if os.path.isfile(file_path):
os.remove(file_path)

# Try remove parent directory if it is empty
try:
os.rmdir(os.path.dirname(file_path))
except OSError:
pass
parentdir = os.path.dirname(file_path)
if os.path.isdir(parentdir) and not os.listdir(parentdir):
try:
os.rmdir(parentdir)
except OSError:
pass
except Exception as e:
logger.error(f"[Watcher] Error [{file_path}]: {e}")
# This exception may due to unfinished file writing, so we wait for a while
time.sleep(2)
return
finally:
self.q.task_done()
session.close()

def _scan(self, dirpath: str):
for root, dirs, files in os.walk(dirpath):
for filename in files:
file_path = os.path.join(root, filename)
self.file_queue.put(file_path)
for inner_dir in dirs:
inner_path = os.path.join(root, inner_dir)
self._scan(inner_path)
33 changes: 33 additions & 0 deletions migrate/copy_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# GPT generated script not revised yet.

import os
import shutil

from sqlalchemy import MetaData, Table, create_engine, select
from sqlalchemy.orm import sessionmaker

# 创建SQLite数据库引擎
engine = create_engine('sqlite:///migrate_20240411190857.db')

# 创建Session
Session = sessionmaker(bind=engine)
session = Session()

# 获取log表的元数据
metadata = MetaData()
log_table = Table('log', metadata, autoload_with=engine)

# 查询所有message不为空的记录
query = select(log_table).where(log_table.c.message != None)
results = session.execute(query)

# 创建exceptions文件夹
exceptions_dir = os.path.join(os.getcwd(), 'exception')
os.makedirs(exceptions_dir, exist_ok=True)

# 复制文件
for result in results:
srcfile = result['srcfile']
if srcfile:
dstfile = os.path.join(exceptions_dir, srcfile)
shutil.copyfile(f"/records/{srcfile}", dstfile)

0 comments on commit d1dadc1

Please sign in to comment.