Skip to content

Commit

Permalink
some fix; add migrate script
Browse files Browse the repository at this point in the history
  • Loading branch information
lichifeng committed Mar 30, 2024
1 parent 2cd9523 commit 55d407f
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 4 deletions.
2 changes: 1 addition & 1 deletion mgxhub/db/operation/search_games.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def search_games(criteria: SearchCriteria) -> dict:
if criteria.order_desc:
order_by = desc(order_by)

query = query.order_by(order_by).limit(criteria.page_size).offset(criteria.page * criteria.page_size)
query = query.order_by(order_by).limit(criteria.page_size).offset((criteria.page - 1) * criteria.page_size)

games = [{
'game_guid': game.game_guid,
Expand Down
2 changes: 1 addition & 1 deletion mgxhub/model/searchcriteria.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class SearchCriteria(BaseModel):
'''Search criteria for the search API'''

page: Optional[int] = Field(default=0, ge=0)
page: Optional[int] = Field(default=1, ge=1)
page_size: Optional[int] = Field(default=100, ge=1)
order_by: Optional[str] = Field(default=None)
order_desc: Optional[bool] = Field(default=False)
Expand Down
2 changes: 2 additions & 0 deletions mgxhub/processor/proc_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def process_record(
recpath (str): The path of the record file to be processed.
waitio (bool): Whether to wait for the I/O tasks to complete. Like saving to S3 and DB ops.
opts (str): Options for the processor.
s3replace (bool): Whether to replace the existing file in S3.
cleanup (bool): Whether to delete the file after processing.
Returns:
dict: The result of the processing.
Expand Down
227 changes: 227 additions & 0 deletions migrate/migrate_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import hashlib
import os
import shutil
import tempfile
import zipfile
from datetime import datetime

import requests
from pymongo import MongoClient
from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from tqdm import tqdm

MONGO_URI = 'mongodb://username:password@localhost:27017/'
RECORDS_DIR = '/records'
API_URL = 'http://example.com/api/upload'

# 定义数据库名
database_name = f"migrate_{datetime.now().strftime('%Y%m%d%H%M%S')}.db"

# 创建SQLite数据库引擎
engine = create_engine(f'sqlite:///{database_name}')

# 创建ORM基类
Base = declarative_base()

# 定义Log表


class Log(Base):
__tablename__ = 'log'
id = Column(Integer, primary_key=True)
srcfile = Column(String)
mongid = Column(String)
md5 = Column(String)
found = Column(DateTime)
updated = Column(DateTime)
gametime = Column(DateTime)
status = Column(Integer)
message = Column(String)


# 创建表
Base.metadata.create_all(engine)

# 创建Session
Session = sessionmaker(bind=engine)
session = Session()

# 接下来是连接MongoDB部分

# 连接MongoDB数据库
client = MongoClient(MONGO_URI)
mongodb = client.mgxhub
records_collection = mongodb.records


# 创建临时目录
temp_dir = tempfile.mkdtemp()


# 遍历records_dir目录下的每个文件
N = 10 # 每N个文件打包成一个压缩包
files_to_zip = []
queued_logs = []

# 创建一个tqdm对象
pbar = tqdm(os.listdir(RECORDS_DIR))
for entry in pbar:
filepath = os.path.join(RECORDS_DIR, entry)
if os.path.isfile(filepath) and entry.endswith('.zip'):
try:
with zipfile.ZipFile(filepath, 'r') as zip_ref:
# 解压zip文件到临时目录
zip_ref.extractall(temp_dir)
extracted_file = zip_ref.namelist()[0]
extracted_path = os.path.join(temp_dir, extracted_file)

# 计算解压出来的文件的md5
with open(extracted_path, 'rb') as file_to_check:
data = file_to_check.read()
md5_result = hashlib.md5(data).hexdigest()

# 查找records集合中的文档
record = records_collection.find_one({'files.md5': md5_result})
if record:
# 修改解压后文件的修改日期
if 'gameTime' in record:
gametime = record['gameTime']
os.utime(extracted_path, (gametime.timestamp(), gametime.timestamp()))
else:
pass

# 重命名文件
if 'legacy' in record and 'filenames' in record['legacy']:
new_filename1 = max(record['legacy']['filenames'], key=len)
if 'files' in record:
new_filename2 = max(record['files'], key=lambda x: len(x['filename']))['filename']
new_filename = max(new_filename1, new_filename2, key=len)
new_filepath = os.path.join(temp_dir, new_filename)
if new_filepath in files_to_zip:
base, ext = os.path.splitext(new_filepath)
suffix = 1
while f"{base}_{suffix}{ext}" in files_to_zip:
suffix += 1
new_filepath = f"{base}_{suffix}{ext}"
os.rename(extracted_path, new_filepath)

# 添加文件到待打包列表
files_to_zip.append(new_filepath)

# 记录日志
# 保存到SQLite数据库
if not 'lastUpdated' in record:
updated = datetime.now()
else:
updated = record['lastUpdated']

if not 'firstFound' in record:
found = datetime.now()
else:
found = record['firstFound']

log_entry = Log(srcfile=entry, mongid=str(record['_id']), md5=md5_result,
found=found, updated=updated,
gametime=gametime, status=None, message=None)
queued_logs.append(log_entry)

# 检查是否达到了N个文件
if len(files_to_zip) >= N:
# 创建ZIP文件
zip_filename = f'batch_{datetime.now().strftime("%Y%m%d%H%M%S")}.zip'
zip_filepath = os.path.join(temp_dir, zip_filename)
with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for file in files_to_zip:
zip_file.write(file, arcname=os.path.basename(file))

# 发送到API
with open(zip_filepath, 'rb') as file_to_upload:
size = os.path.getsize(zip_filepath)
size_mb = round(size / (1024 * 1024), 2)

# 更新进度条的描述信息
pbar.set_description(f'Uploading {entry} ({size_mb} MB)...')

files = {'recfile': file_to_upload}
response = requests.post(API_URL, files=files, auth=('username', 'password'))

# 将queued_logs中的日志写入数据库
for log_entry in queued_logs:
session.add(log_entry)
session.commit()
queued_logs = []

# 删除临时目录中的文件
for file in files_to_zip:
os.remove(file)
os.remove(zip_filepath)

# 清空待打包列表
files_to_zip = []
else:
# 文件存到exception文件夹下
exception_path = os.path.join(os.getcwd(), 'exception', entry)
shutil.move(filepath, exception_path)

# 数据库里保存一条相应的信息
log_entry = Log(srcfile=entry, md5=md5_result,
status=None, message='File not found in MongoDB.')
session.add(log_entry)
session.commit()
except Exception as e:
# 记录错误信息
log_entry = Log(srcfile=entry, md5=md5_result,
status=None, message=str(e))
session.add(log_entry)
session.commit()

# 处理剩余的文件
if files_to_zip:
# 创建ZIP文件
zip_filename = f'batch_{datetime.now().strftime("%Y%m%d%H%M%S")}.zip'
zip_filepath = os.path.join(temp_dir, zip_filename)
with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for file in files_to_zip:
zip_file.write(file, arcname=os.path.basename(file))

# 发送到API
with open(zip_filepath, 'rb') as file_to_upload:
files = {'recfile': file_to_upload}
# lastmod = datetime.fromtimestamp(os.path.getmtime(zip_filepath)).isoformat()
# data = {'lastmod': lastmod}
# response = requests.post(API_URL, files=files, data=data, auth=('username', 'password'))
response = requests.post(API_URL, files=files, auth=('username', 'password'))

# 将queued_logs中的日志写入数据库
for log_entry in queued_logs:
session.add(log_entry)
session.commit()
queued_logs = []

# 删除临时目录中的文件
for file in files_to_zip:
os.remove(file)
os.remove(zip_filepath)

# 删除临时目录
shutil.rmtree(temp_dir)

# 获取总数
total = records_collection.count_documents({})

# 遍历records集合中的每个文档的files字段
for record in tqdm(records_collection.find(), total=total):
for file in record.get('files', []):
md5_value = file.get('md5')
if md5_value and not session.query(Log).filter_by(md5=md5_value).first():
# 新增记录到SQLite数据库
log_entry = Log(
mongid=str(record['_id']), md5=md5_value,
status=None, message='MD5 not found in SQLite.')
session.add(log_entry)
session.commit()

# 关闭数据库连接
session.close()
29 changes: 29 additions & 0 deletions migrate/old_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pymongo import MongoClient
from tqdm import tqdm


def get_all_keys(doc, prefix=''):
keys = set()
for key, value in doc.items():
if prefix:
key = f'{prefix}.{key}'
keys.add(key)
if isinstance(value, dict):
keys.update(get_all_keys(value, key))
return keys


# 连接到MongoDB
mongo_client = MongoClient('mongodb://root:example@172.20.0.3:27017/')
mongo_db = mongo_client['mgxhub']
mongo_collection = mongo_db['records']

# 获取所有键
all_keys = set()
total_documents = mongo_collection.count_documents({})
for document in tqdm(mongo_collection.find(), total=total_documents):
all_keys.update(get_all_keys(document))

# 打印结果
for key in sorted(all_keys):
print(key)
58 changes: 58 additions & 0 deletions migrate/oldkeys.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
_id
chat
chat.0
chat.1
chat.13
chat.2
chat.3
chat.32540
chat.32560
chat.32764
chat.32765
chat.32766
chat.32767
chat.4
chat.5
chat.5672
chat.6
chat.65535
chat.7
chat.7160
chat.8
duration
files
firstFound
gameTime
guid
includeAI
instruction
isMultiplayer
lastUpdated
legacy
legacy.filenames
legacy.id
map
map.name
map.nameEn
map.size
map.sizeEn
modifiedDate
notOK
players
population
rawEncoding
speed
speedEn
teamMode
version
version.build
version.code
version.interVer
version.logVer
version.newVer
version.rawStr
version.saveVer
version.scenarioVersion
victory
victory.type
victory.typeEn
2 changes: 1 addition & 1 deletion webapi/routers/game_detail.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def get_game(guid: str, lang: str = 'en') -> GameDetail | None:
raise HTTPException(status_code=404, detail=f"Game profile [{guid}] not found")

player_data = db().query(Player).filter(Player.game_guid == guid).all()
file_data = db().query(File).filter(File.game_guid == guid).all()
file_data = db().query(File).filter(File.game_guid == guid).limit(10).all()
chat_data = db().query(Chat.chat_time, Chat.chat_content)\
.filter(Chat.game_guid == guid)\
.group_by(Chat.chat_time, Chat.chat_content)\
Expand Down
6 changes: 5 additions & 1 deletion webapi/routers/game_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastapi import Depends, File, Form, UploadFile
from fastapi.security import HTTPBasicCredentials

from mgxhub import logger
from mgxhub.auth import WPRestAPI
from mgxhub.processor import FileProcessor
from webapi import app
Expand Down Expand Up @@ -36,7 +37,10 @@ async def upload_a_record(
if s3replace and not WPRestAPI(creds.username, creds.password).need_admin_login(brutal_term=False):
s3replace = False

if not lastmod:
try:
lastmod = datetime.fromtimestamp(int(lastmod)).isoformat()
except Exception as e:
logger.warning(f'Invalid lastmod: {e}')
lastmod = datetime.now().isoformat()

processed = FileProcessor(
Expand Down

0 comments on commit 55d407f

Please sign in to comment.