Skip to content

Commit

Permalink
fix(mongodb): 修正语法 TencentBlueKing#3379
Browse files Browse the repository at this point in the history
  • Loading branch information
cycker authored and iSecloud committed Feb 27, 2024
1 parent b3cdfc2 commit 1379102
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,25 @@ import (
"fmt"
"os"
"path"
"path/filepath"

"github.com/go-playground/validator/v10"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/mongo"
)

// 备份
// 1. 分析参数,确定要备份的库和表
// 2. 执行备份
// 3. 上报备份记录
// 4. 上报到备份系统,等待备份系统完成
// BsTaskArg 备份任务,作为参数传入
type BsTaskArg struct {
TaskId string `json:"task_id"`
FileName string `json:"file_name"`
}

// 回档
// 1. 将备份文件解压到临时目录.
// 2. 检查目标MongoDB中,没有要恢复的库和表.
// 3. 执行恢复,并将恢复日志写入到文件中.
// 4. 检查恢复日志,如果有错误,返回错误.
// 5. 删除临时目录

// restoreParam 备份任务参数,由前端传入
type restoreParam struct {
Expand All @@ -31,10 +39,11 @@ type restoreParam struct {
InstanceType string `json:"instanceType"`

Args struct {
SrcFile string `json:"srcFile"`
IsPartial bool `json:"isPartial"` // 为true时,备份指定库和表
Oplog bool `json:"oplog"` // 是否备份oplog,只有在IsPartial为false可为true
NsFilter NsFilterArg `json:"nsFilter"`
RecoverDir string `json:"RecoverDir"` // /data/dbbak/recover_mg/
SrcFile []BsTaskArg `json:"srcFile"` // 目前只需要1个文件,但是为了兼容,还是使用数组.
IsPartial bool `json:"isPartial"` // 为true时,备份指定库和表
Oplog bool `json:"oplog"` // 是否备份oplog,只有在IsPartial为false可为true
NsFilter NsFilterArg `json:"nsFilter"`
} `json:"Args"`
}

Expand Down Expand Up @@ -102,17 +111,18 @@ func (s *restoreJob) doLogicalRestore() error {
helper := logical.NewMongoRestoreHelper(s.MongoInst, s.MongoRestoreBin, s.param.AdminUsername,
s.param.AdminPassword, "admin", s.OsUser)

fileSize, err := util.GetFileSize(s.param.Args.SrcFile)
srcFilePath := filepath.Join(s.param.Args.RecoverDir, s.param.Args.SrcFile[0].FileName)
fileSize, err := util.GetFileSize(srcFilePath)
if err != nil {
return errors.Wrap(err, "GetFileSize")
}

log.Info("start untar file %s, fileSize %d", s.param.Args.SrcFile, fileSize)
dstDir, err := logical.UntarFile(s.param.Args.SrcFile)
log.Info("start untar file %s, fileSize %d", srcFilePath, fileSize)
dstDir, err := logical.UntarFile(srcFilePath)
if err != nil {
return errors.Wrap(err, "UntarFile")
}
log.Info("end untar file %s, dstDir %s", s.param.Args.SrcFile, dstDir)
log.Info("end untar file %s, dstDir %s", srcFilePath, dstDir)
dstDirWithDump := path.Join(dstDir, "dump")
// get DbCollection from Dir
dbColList, err := logical.GetDbCollectionFromDir(dstDirWithDump)
Expand Down Expand Up @@ -230,6 +240,11 @@ func (s *restoreJob) Init(runtime *jobruntime.JobGenericRuntime) error {
return err
}

if util.FileExists(s.param.Args.RecoverDir) == false {
return errors.New("recover dir is empty")
}

// prepare mongo client and mongodump path
s.MongoInst = mymongo.NewMongoHost(
s.param.IP, fmt.Sprintf("%d", s.param.Port),
"admin", s.param.AdminUsername, s.param.AdminPassword, "", s.param.IP)
Expand Down
11 changes: 10 additions & 1 deletion dbm-services/mongo/db-tools/dbactuator/pkg/util/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"os"
)

// FileExists 检查目录是否已经存在
// FileExists 检查path是否已经存在
func FileExists(path string) bool {
_, err := os.Stat(path)
if err != nil {
Expand All @@ -18,6 +18,15 @@ func FileExists(path string) bool {
return true
}

// DirExists 检查目录是否已经存在
func DirExists(path string) bool {
f, err := os.Stat(path)
if err != nil {
return os.IsExist(err)
}
return f.IsDir()
}

// GetFileMd5 求文件md5sum值
func GetFileMd5(fileAbPath string) (md5sum string, err error) {
rFile, err := os.Open(fileAbPath)
Expand Down
12 changes: 8 additions & 4 deletions dbm-services/mongo/db-tools/dbmon/cmd/mongojob/backup_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"
)

// BackupTaskOption TODO
// BackupTaskOption 备份任务参数
type BackupTaskOption struct {
// TaskName 任务名称
TaskName string `json:"task_name"`
Expand All @@ -28,7 +28,7 @@ type BackupTaskOption struct {
Labels string `json:"labels"`
}

// BackupTask TODO
// BackupTask 备份任务
type BackupTask struct {
}

Expand All @@ -38,6 +38,9 @@ func NewBackupTask() *BackupTask {
}

// Do 执行任务
// 组装命令行,调用MongoToolKit执行备份任务,返回错误
// 调用MongoToolKit执行备份任务的原因是,MongoToolKit已经实现了备份的逻辑,不需要重复实现
// 也可实现备份时可重启dbmon,但目前没有实现
func (task *BackupTask) Do(option *BackupTaskOption) error {

backupType := "AUTO"
Expand All @@ -53,14 +56,15 @@ func (task *BackupTask) Do(option *BackupTaskOption) error {
if option.SendToBs {
cb.Append("--send-to-bs")
}

if option.RemoveOldFileFirst {
cb.Append("--remove-old-file-first")
}

cmdLine := cb.GetCmdLine("", false)
// dbmon的日志不上传Es,可以打印密码.
cmdLine := cb.GetCmdLine2(false)
mylog.Logger.Info(fmt.Sprintf("cmdLine: %s", cmdLine))

// cmd, args := cb.GetCmd()
o, err := cb.Run2(time.Hour * 24)
mylog.Logger.Info(
fmt.Sprintf("Exec %s cost %0.1f Seconds, stdout: %s, stderr %s",
Expand Down
2 changes: 1 addition & 1 deletion dbm-services/mongo/db-tools/dbmon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type BkDbmLabel struct {
App string `json:"app" mapstructure:"app" yaml:"app"`
AppName string `json:"app_name" mapstructure:"-" yaml:"app_name"`
ClusterDomain string `json:"cluster_domain" mapstructure:"cluster_domain" yaml:"cluster_domain"`
ClusterId string `json:"cluster_id" mapstructure:"cluster_id" yaml:"cluster_id"`
ClusterId int64 `json:"cluster_id" mapstructure:"cluster_id" yaml:"cluster_id"`
ClusterName string `json:"cluster_name" mapstructure:"cluster_name" yaml:"cluster_name"`
ClusterType string `json:"cluster_type" mapstructure:"cluster_type" yaml:"cluster_type"`
RoleType string `json:"role_type" mapstructure:"role_type" yaml:"role_type"` // shardsvr,mongos,configsvr
Expand Down
6 changes: 3 additions & 3 deletions dbm-services/mongo/db-tools/dbmon/pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ const (
const (

// MongoTypeShardedCluster TODO
MongoTypeShardedCluster = "ShardedCluster"
MongoTypeShardedCluster = "MongoShardedCluster"
// MongoTypeReplicaSet TODO
MongoTypeReplicaSet = "ReplicaSet"
MongoTypeReplicaSet = "MongoReplicaSet"
// MongoTypeStandalone TODO
MongoTypeStandalone = "Standalone"
MongoTypeStandalone = "MongoStandalone"
)

// time layout
Expand Down
27 changes: 1 addition & 26 deletions dbm-services/mongo/db-tools/dbmon/pkg/linuxproc/net_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ func ParseHexAddr(host string) (ip string, port int, err error) {
if err != nil {
return "", 0, err
}

ip = InetNtoA(n)

n2, err := strconv.ParseUint(fs[1], 16, 32)
n2, err := strconv.ParseInt(fs[1], 16, 32)
if err != nil {
return "", 0, err
}
Expand All @@ -87,27 +86,6 @@ const ProcNetTcpPath = "/proc/net/tcp"
const ESTABLISHED = 1
const LISTEN = 10

/*
ProcNetTcp
读取/proc/net/tcp文件
sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
0: 0100007F:1B1E 00000000:0000 0A 00000000:00000000 00:00000000
-- 00000000 0 0 442153839 1 ffff880101899500 100 0 0 10 0
TCP_ESTABLISHED(1),
TCP_SYN_SENT,
TCP_SYN_RECV,
TCP_FIN_WAIT1,
TCP_FIN_WAIT2,
TCP_TIME_WAIT,
TCP_CLOSE,
TCP_CLOSE_WAIT,
TCP_LAST_ACK,
TCP_LISTEN,
TCP_CLOSING, Now a valid state
TCP_NEW_SYN_RECV,
TCP_MAX_STATES Leave at the end!
*/

// ProcNetTcp 读取/proc/net/tcp文件
func ProcNetTcp(input []byte) (rows []NetTcp, err error) {
var fh *os.File
Expand All @@ -127,8 +105,6 @@ func ProcNetTcp(input []byte) (rows []NetTcp, err error) {
for scanner.Scan() {
nLine++
line := scanner.Text()
// fmt.Printf("%d %s\n", nLine, line)
// skip first line
row := NetTcp{}
row.Fields = strings.Fields(line)

Expand All @@ -145,7 +121,6 @@ func ProcNetTcp(input []byte) (rows []NetTcp, err error) {
row.RemoteHost, row.RemotePort, err = ParseHexAddr(row.Fields[2])
v, _ := strconv.ParseUint(row.Fields[3], 16, 8)
row.St = int(v)
// fmt.Printf("row %+v\n", row)
rows = append(rows, row)
}
return rows, nil
Expand Down
33 changes: 19 additions & 14 deletions dbm-services/mongo/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package sendwarning
// 消息有两类,Event事件消息和TimeSeries时序消息
import (
"dbm-services/mongo/db-tools/dbmon/mylog"
"dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/mycmd"
"encoding/json"
"fmt"
"strings"
Expand Down Expand Up @@ -68,15 +69,16 @@ func (bm *BkMonitorEventSender) SendEventMsg(dataId int64, token string, eventNa
bm.SetEventCreateTime()

tempBytes, _ := json.Marshal(bm)
sendCmd := fmt.Sprintf(
`%s -report -report.bk_data_id %d -report.type agent -report.message.kind event -report.agent.address %s -report.message.body '%s'`,
bm.ToolBkMonitorBeat, bm.DataID, bm.AgentAddress, string(tempBytes))
mylog.Logger.Info(sendCmd)
_, err = util.RunBashCmdNoLog(sendCmd, "", nil, 20*time.Second)
if err != nil {
return
}
return nil
sendCmd := mycmd.New(bm.ToolBkMonitorBeat,
"-report", "-report.bk_data_id", bm.DataID,
"-report.type", "agent",
"-report.message.kind", "event",
"-report.agent.address", bm.AgentAddress,
"-report.message.body", string(tempBytes))

mylog.Logger.Info(sendCmd.GetCmdLine("", false))
_, err = sendCmd.Run2(20 * time.Second)
return
}

// SendTimeSeriesMsg dbmon心跳上报. "mongo_dbmon_heart_beat"
Expand All @@ -92,11 +94,14 @@ func (bm *BkMonitorEventSender) SendTimeSeriesMsg(dataId int64, token string, ta
metrics[metricName] = val
bm.Data[0].Metrics = metrics
tempBytes, _ := json.Marshal(bm)
sendCmd := fmt.Sprintf(
`%s -report -report.bk_data_id %d -report.type agent -report.message.kind timeseries -report.agent.address %s -report.message.body '%s'`,
bm.ToolBkMonitorBeat, bm.DataID, bm.AgentAddress, string(tempBytes))
mylog.Logger.Info(sendCmd)
_, err = util.RunBashCmdNoLog(sendCmd, "", nil, 20*time.Second)
sendCmd := mycmd.New(bm.ToolBkMonitorBeat,
"-report", "-report.bk_data_id", bm.DataID,
"-report.type", "agent",
"-report.message.kind", "timeseries",
"-report.agent.address", bm.AgentAddress,
"-report.message.body", string(tempBytes))
mylog.Logger.Info(sendCmd.GetCmdLine("", false))
_, err = sendCmd.Run2(20 * time.Second)
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func _uploadFileOnce(file, tag string) (*TaskInfo, error) {
return nil, fmt.Errorf("BackupClient %s is not exists", BackupClient)
}
fileSize, _ := util.GetFileSize(absPath)
timeoutSecond := fileSize/1024/1024/1024 + 60 // --with-md5 会计算md5,所以超时时间要加长一点
timeoutSecond := fileSize/1024/1024/100 + 180 // --with-md5 会计算md5. 每秒100M
outBuf, errBuffer, err := DoCommand(timeoutSecond, BackupClient, "-n", "-f", absPath, "--with-md5", "-t", tag)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ func ParseMongoVersion(version string) (*MongoVersion, error) {
if len(versionArray) != 3 {
return nil, fmt.Errorf("bad version:%s", version)
}
major, err := strconv.ParseInt(versionArray[0], 10, 64)
major, err := strconv.ParseInt(versionArray[0], 10, 32)
if err != nil {
return nil, fmt.Errorf("bad version:%s", version)
}
minor, err := strconv.ParseInt(versionArray[1], 10, 64)
minor, err := strconv.ParseInt(versionArray[1], 10, 32)
if err != nil {
return nil, fmt.Errorf("bad version:%s", version)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func DecodeFileV0INCR(filename string) (*BackupFileName, error) {
return nil, fmt.Errorf("bad format: LastTs (%s %s)err:%v", fields[fn-1], "0", err)
}

if vv, err := strconv.ParseUint(fields[fn-2], 10, 64); err != nil {
if vv, err := strconv.ParseUint(fields[fn-2], 10, 32); err != nil {
return nil, fmt.Errorf("bad format: V0IncrSeq:%v", err)
} else {
bfn.V0IncrSeq = uint32(vv)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,25 @@ func (b *BackupMetaV2) GetMetaFileName() string {
return path.Join(b.MetaDir, MetaFileName)
}

func parseUint32(s string) (uint32, error) {
v, err := strconv.ParseUint(s, 10, 32)
return uint32(v), err

}
func splitTs(ts string) (*TS, error) {
fs := strings.Split(ts, "|")
if len(fs) != 2 {
return nil, fmt.Errorf("splitTs failed, ts: %s", ts)
}
ts1, err := strconv.ParseInt(fs[0], 10, 64)
ts1, err := parseUint32(fs[0])
if err != nil {
return nil, fmt.Errorf("splitTs failed, ts: %s, err: %v", ts, err)
}
ts2, err := strconv.ParseInt(fs[1], 10, 64)
ts2, err := parseUint32(fs[1])
if err != nil {
return nil, fmt.Errorf("splitTs failed, ts: %s, err: %v", ts, err)
}
return &TS{uint32(ts1), uint32(ts2)}, nil
return &TS{ts1, ts2}, nil
}

func parseOplogPosLine(line string) (row *BackupFileName, err error) {
Expand Down

0 comments on commit 1379102

Please sign in to comment.