Skip to content

Commit

Permalink
threadpool memory leak fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lichifeng committed Apr 11, 2024
1 parent 993b8f7 commit 290ca81
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions mgxhub/watcher/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
class RecordWatcher:
'''Watches the work directory for new files and processes them'''

def __init__(self, max_workers=2):
def __init__(self, max_workers=4):
'''Initialize the watcher'''

self.lock_file = "/tmp/mgxhub_record_watcher.lock"
Expand All @@ -37,7 +37,7 @@ def __init__(self, max_workers=2):

if self.work_dir and os.path.isdir(self.work_dir):
self.file_queue = Queue()
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.max_workers = max_workers
self.thread = threading.Thread(target=self._watch, daemon=True)
self.thread.start()
logger.info(f"[Watcher] Monitoring directory: {self.work_dir}")
Expand All @@ -50,12 +50,12 @@ def _remove_lock_file(self):

def _watch(self):
'''Watch the work directory for new files and process them'''

while True:
self._scan(self.work_dir)
while not self.file_queue.empty():
file_path = self.file_queue.get()
self.executor.submit(self._process_file, file_path)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
self._scan(self.work_dir)
while not self.file_queue.empty():
file_path = self.file_queue.get()
executor.submit(self._process_file, file_path)
time.sleep(1)

def _process_file(self, file_path):
Expand Down

0 comments on commit 290ca81

Please sign in to comment.