diff --git a/dbm-services/mongo/db-tools/dbactuator/pkg/atomjobs/atommongodb/restore.go b/dbm-services/mongo/db-tools/dbactuator/pkg/atomjobs/atommongodb/restore.go index bacf123e09..f4b3d947e8 100644 --- a/dbm-services/mongo/db-tools/dbactuator/pkg/atomjobs/atommongodb/restore.go +++ b/dbm-services/mongo/db-tools/dbactuator/pkg/atomjobs/atommongodb/restore.go @@ -10,17 +10,25 @@ import ( "fmt" "os" "path" + "path/filepath" "github.com/go-playground/validator/v10" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/mongo" ) -// 备份 -// 1. 分析参数,确定要备份的库和表 -// 2. 执行备份 -// 3. 上报备份记录 -// 4. 上报到备份系统,等待备份系统完成 +// BsTaskArg 备份任务,作为参数传入 +type BsTaskArg struct { + TaskId string `json:"task_id"` + FileName string `json:"file_name"` +} + +// 回档 +// 1. 将备份文件解压到临时目录. +// 2. 检查目标MongoDB中,没有要恢复的库和表. +// 3. 执行恢复,并将恢复日志写入到文件中. +// 4. 检查恢复日志,如果有错误,返回错误. +// 5. 删除临时目录 // restoreParam 备份任务参数,由前端传入 type restoreParam struct { @@ -31,10 +39,11 @@ type restoreParam struct { InstanceType string `json:"instanceType"` Args struct { - SrcFile string `json:"srcFile"` - IsPartial bool `json:"isPartial"` // 为true时,备份指定库和表 - Oplog bool `json:"oplog"` // 是否备份oplog,只有在IsPartial为false可为true - NsFilter NsFilterArg `json:"nsFilter"` + RecoverDir string `json:"RecoverDir"` // /data/dbbak/recover_mg/ + SrcFile []BsTaskArg `json:"srcFile"` // 目前只需要1个文件,但是为了兼容,还是使用数组. + IsPartial bool `json:"isPartial"` // 为true时,备份指定库和表 + Oplog bool `json:"oplog"` // 是否备份oplog,只有在IsPartial为false可为true + NsFilter NsFilterArg `json:"nsFilter"` } `json:"Args"` } @@ -102,17 +111,18 @@ func (s *restoreJob) doLogicalRestore() error { helper := logical.NewMongoRestoreHelper(s.MongoInst, s.MongoRestoreBin, s.param.AdminUsername, s.param.AdminPassword, "admin", s.OsUser) - fileSize, err := util.GetFileSize(s.param.Args.SrcFile) + srcFilePath := filepath.Join(s.param.Args.RecoverDir, s.param.Args.SrcFile[0].FileName) + fileSize, err := util.GetFileSize(srcFilePath) if err != nil { return errors.Wrap(err, "GetFileSize") } - log.Info("start untar file %s, fileSize %d", s.param.Args.SrcFile, fileSize) - dstDir, err := logical.UntarFile(s.param.Args.SrcFile) + log.Info("start untar file %s, fileSize %d", srcFilePath, fileSize) + dstDir, err := logical.UntarFile(srcFilePath) if err != nil { return errors.Wrap(err, "UntarFile") } - log.Info("end untar file %s, dstDir %s", s.param.Args.SrcFile, dstDir) + log.Info("end untar file %s, dstDir %s", srcFilePath, dstDir) dstDirWithDump := path.Join(dstDir, "dump") // get DbCollection from Dir dbColList, err := logical.GetDbCollectionFromDir(dstDirWithDump) @@ -230,6 +240,11 @@ func (s *restoreJob) Init(runtime *jobruntime.JobGenericRuntime) error { return err } + if util.FileExists(s.param.Args.RecoverDir) == false { + return errors.New("recover dir is empty") + } + + // prepare mongo client and mongodump path s.MongoInst = mymongo.NewMongoHost( s.param.IP, fmt.Sprintf("%d", s.param.Port), "admin", s.param.AdminUsername, s.param.AdminPassword, "", s.param.IP) diff --git a/dbm-services/mongo/db-tools/dbactuator/pkg/util/file.go b/dbm-services/mongo/db-tools/dbactuator/pkg/util/file.go index 75d0310b21..23eac82ea2 100644 --- a/dbm-services/mongo/db-tools/dbactuator/pkg/util/file.go +++ b/dbm-services/mongo/db-tools/dbactuator/pkg/util/file.go @@ -9,7 +9,7 @@ import ( "os" ) -// FileExists 检查目录是否已经存在 +// FileExists 检查path是否已经存在 func FileExists(path string) bool { _, err := os.Stat(path) if err != nil { @@ -18,6 +18,15 @@ func FileExists(path string) bool { return true } +// DirExists 检查目录是否已经存在 +func DirExists(path string) bool { + f, err := os.Stat(path) + if err != nil { + return os.IsExist(err) + } + return f.IsDir() +} + // GetFileMd5 求文件md5sum值 func GetFileMd5(fileAbPath string) (md5sum string, err error) { rFile, err := os.Open(fileAbPath) diff --git a/dbm-services/mongo/db-tools/dbmon/cmd/mongojob/backup_task.go b/dbm-services/mongo/db-tools/dbmon/cmd/mongojob/backup_task.go index e23d4c5222..01b034d60f 100644 --- a/dbm-services/mongo/db-tools/dbmon/cmd/mongojob/backup_task.go +++ b/dbm-services/mongo/db-tools/dbmon/cmd/mongojob/backup_task.go @@ -9,7 +9,7 @@ import ( "time" ) -// BackupTaskOption TODO +// BackupTaskOption 备份任务参数 type BackupTaskOption struct { // TaskName 任务名称 TaskName string `json:"task_name"` @@ -28,7 +28,7 @@ type BackupTaskOption struct { Labels string `json:"labels"` } -// BackupTask TODO +// BackupTask 备份任务 type BackupTask struct { } @@ -38,6 +38,9 @@ func NewBackupTask() *BackupTask { } // Do 执行任务 +// 组装命令行,调用MongoToolKit执行备份任务,返回错误 +// 调用MongoToolKit执行备份任务的原因是,MongoToolKit已经实现了备份的逻辑,不需要重复实现 +// 也可实现备份时可重启dbmon,但目前没有实现 func (task *BackupTask) Do(option *BackupTaskOption) error { backupType := "AUTO" @@ -53,14 +56,15 @@ func (task *BackupTask) Do(option *BackupTaskOption) error { if option.SendToBs { cb.Append("--send-to-bs") } + if option.RemoveOldFileFirst { cb.Append("--remove-old-file-first") } - cmdLine := cb.GetCmdLine("", false) + // dbmon的日志不上传Es,可以打印密码. + cmdLine := cb.GetCmdLine2(false) mylog.Logger.Info(fmt.Sprintf("cmdLine: %s", cmdLine)) - // cmd, args := cb.GetCmd() o, err := cb.Run2(time.Hour * 24) mylog.Logger.Info( fmt.Sprintf("Exec %s cost %0.1f Seconds, stdout: %s, stderr %s", diff --git a/dbm-services/mongo/db-tools/dbmon/config/config.go b/dbm-services/mongo/db-tools/dbmon/config/config.go index 391db43852..f22335d707 100644 --- a/dbm-services/mongo/db-tools/dbmon/config/config.go +++ b/dbm-services/mongo/db-tools/dbmon/config/config.go @@ -23,7 +23,7 @@ type BkDbmLabel struct { App string `json:"app" mapstructure:"app" yaml:"app"` AppName string `json:"app_name" mapstructure:"-" yaml:"app_name"` ClusterDomain string `json:"cluster_domain" mapstructure:"cluster_domain" yaml:"cluster_domain"` - ClusterId string `json:"cluster_id" mapstructure:"cluster_id" yaml:"cluster_id"` + ClusterId int64 `json:"cluster_id" mapstructure:"cluster_id" yaml:"cluster_id"` ClusterName string `json:"cluster_name" mapstructure:"cluster_name" yaml:"cluster_name"` ClusterType string `json:"cluster_type" mapstructure:"cluster_type" yaml:"cluster_type"` RoleType string `json:"role_type" mapstructure:"role_type" yaml:"role_type"` // shardsvr,mongos,configsvr diff --git a/dbm-services/mongo/db-tools/dbmon/pkg/consts/consts.go b/dbm-services/mongo/db-tools/dbmon/pkg/consts/consts.go index a6909d5fdd..d38cdf9a90 100644 --- a/dbm-services/mongo/db-tools/dbmon/pkg/consts/consts.go +++ b/dbm-services/mongo/db-tools/dbmon/pkg/consts/consts.go @@ -9,11 +9,11 @@ const ( const ( // MongoTypeShardedCluster TODO - MongoTypeShardedCluster = "ShardedCluster" + MongoTypeShardedCluster = "MongoShardedCluster" // MongoTypeReplicaSet TODO - MongoTypeReplicaSet = "ReplicaSet" + MongoTypeReplicaSet = "MongoReplicaSet" // MongoTypeStandalone TODO - MongoTypeStandalone = "Standalone" + MongoTypeStandalone = "MongoStandalone" ) // time layout diff --git a/dbm-services/mongo/db-tools/dbmon/pkg/linuxproc/net_tcp.go b/dbm-services/mongo/db-tools/dbmon/pkg/linuxproc/net_tcp.go index 1471529aee..77f7e41756 100644 --- a/dbm-services/mongo/db-tools/dbmon/pkg/linuxproc/net_tcp.go +++ b/dbm-services/mongo/db-tools/dbmon/pkg/linuxproc/net_tcp.go @@ -70,10 +70,9 @@ func ParseHexAddr(host string) (ip string, port int, err error) { if err != nil { return "", 0, err } - ip = InetNtoA(n) - n2, err := strconv.ParseUint(fs[1], 16, 32) + n2, err := strconv.ParseInt(fs[1], 16, 32) if err != nil { return "", 0, err } @@ -87,27 +86,6 @@ const ProcNetTcpPath = "/proc/net/tcp" const ESTABLISHED = 1 const LISTEN = 10 -/* - ProcNetTcp -读取/proc/net/tcp文件 -sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode -0: 0100007F:1B1E 00000000:0000 0A 00000000:00000000 00:00000000 - -- 00000000 0 0 442153839 1 ffff880101899500 100 0 0 10 0 - TCP_ESTABLISHED(1), - TCP_SYN_SENT, - TCP_SYN_RECV, - TCP_FIN_WAIT1, - TCP_FIN_WAIT2, - TCP_TIME_WAIT, - TCP_CLOSE, - TCP_CLOSE_WAIT, - TCP_LAST_ACK, - TCP_LISTEN, - TCP_CLOSING, Now a valid state - TCP_NEW_SYN_RECV, - TCP_MAX_STATES Leave at the end! -*/ - // ProcNetTcp 读取/proc/net/tcp文件 func ProcNetTcp(input []byte) (rows []NetTcp, err error) { var fh *os.File @@ -127,8 +105,6 @@ func ProcNetTcp(input []byte) (rows []NetTcp, err error) { for scanner.Scan() { nLine++ line := scanner.Text() - // fmt.Printf("%d %s\n", nLine, line) - // skip first line row := NetTcp{} row.Fields = strings.Fields(line) @@ -145,7 +121,6 @@ func ProcNetTcp(input []byte) (rows []NetTcp, err error) { row.RemoteHost, row.RemotePort, err = ParseHexAddr(row.Fields[2]) v, _ := strconv.ParseUint(row.Fields[3], 16, 8) row.St = int(v) - // fmt.Printf("row %+v\n", row) rows = append(rows, row) } return rows, nil diff --git a/dbm-services/mongo/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go b/dbm-services/mongo/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go index c1b7f1c5cb..ebf2bbe952 100644 --- a/dbm-services/mongo/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go +++ b/dbm-services/mongo/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go @@ -4,6 +4,7 @@ package sendwarning // 消息有两类,Event事件消息和TimeSeries时序消息 import ( "dbm-services/mongo/db-tools/dbmon/mylog" + "dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/mycmd" "encoding/json" "fmt" "strings" @@ -68,15 +69,16 @@ func (bm *BkMonitorEventSender) SendEventMsg(dataId int64, token string, eventNa bm.SetEventCreateTime() tempBytes, _ := json.Marshal(bm) - sendCmd := fmt.Sprintf( - `%s -report -report.bk_data_id %d -report.type agent -report.message.kind event -report.agent.address %s -report.message.body '%s'`, - bm.ToolBkMonitorBeat, bm.DataID, bm.AgentAddress, string(tempBytes)) - mylog.Logger.Info(sendCmd) - _, err = util.RunBashCmdNoLog(sendCmd, "", nil, 20*time.Second) - if err != nil { - return - } - return nil + sendCmd := mycmd.New(bm.ToolBkMonitorBeat, + "-report", "-report.bk_data_id", bm.DataID, + "-report.type", "agent", + "-report.message.kind", "event", + "-report.agent.address", bm.AgentAddress, + "-report.message.body", string(tempBytes)) + + mylog.Logger.Info(sendCmd.GetCmdLine("", false)) + _, err = sendCmd.Run2(20 * time.Second) + return } // SendTimeSeriesMsg dbmon心跳上报. "mongo_dbmon_heart_beat" @@ -92,11 +94,14 @@ func (bm *BkMonitorEventSender) SendTimeSeriesMsg(dataId int64, token string, ta metrics[metricName] = val bm.Data[0].Metrics = metrics tempBytes, _ := json.Marshal(bm) - sendCmd := fmt.Sprintf( - `%s -report -report.bk_data_id %d -report.type agent -report.message.kind timeseries -report.agent.address %s -report.message.body '%s'`, - bm.ToolBkMonitorBeat, bm.DataID, bm.AgentAddress, string(tempBytes)) - mylog.Logger.Info(sendCmd) - _, err = util.RunBashCmdNoLog(sendCmd, "", nil, 20*time.Second) + sendCmd := mycmd.New(bm.ToolBkMonitorBeat, + "-report", "-report.bk_data_id", bm.DataID, + "-report.type", "agent", + "-report.message.kind", "timeseries", + "-report.agent.address", bm.AgentAddress, + "-report.message.body", string(tempBytes)) + mylog.Logger.Info(sendCmd.GetCmdLine("", false)) + _, err = sendCmd.Run2(20 * time.Second) return } diff --git a/dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/backupsys/backup_client.go b/dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/backupsys/backup_client.go index 4bb70f6f11..9d73520c6b 100644 --- a/dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/backupsys/backup_client.go +++ b/dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/backupsys/backup_client.go @@ -98,7 +98,7 @@ func _uploadFileOnce(file, tag string) (*TaskInfo, error) { return nil, fmt.Errorf("BackupClient %s is not exists", BackupClient) } fileSize, _ := util.GetFileSize(absPath) - timeoutSecond := fileSize/1024/1024/1024 + 60 // --with-md5 会计算md5,所以超时时间要加长一点 + timeoutSecond := fileSize/1024/1024/100 + 180 // --with-md5 会计算md5. 每秒100M outBuf, errBuffer, err := DoCommand(timeoutSecond, BackupClient, "-n", "-f", absPath, "--with-md5", "-t", tag) if err != nil { return nil, err diff --git a/dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/mymongo/version.go b/dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/mymongo/version.go index f076b33841..933472cf8b 100644 --- a/dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/mymongo/version.go +++ b/dbm-services/mongo/db-tools/mongo-toolkit-go/pkg/mymongo/version.go @@ -24,11 +24,11 @@ func ParseMongoVersion(version string) (*MongoVersion, error) { if len(versionArray) != 3 { return nil, fmt.Errorf("bad version:%s", version) } - major, err := strconv.ParseInt(versionArray[0], 10, 64) + major, err := strconv.ParseInt(versionArray[0], 10, 32) if err != nil { return nil, fmt.Errorf("bad version:%s", version) } - minor, err := strconv.ParseInt(versionArray[1], 10, 64) + minor, err := strconv.ParseInt(versionArray[1], 10, 32) if err != nil { return nil, fmt.Errorf("bad version:%s", version) } diff --git a/dbm-services/mongo/db-tools/mongo-toolkit-go/toolkit/pitr/filename.go b/dbm-services/mongo/db-tools/mongo-toolkit-go/toolkit/pitr/filename.go index 6ae40d8e3e..a19c4a7dfa 100644 --- a/dbm-services/mongo/db-tools/mongo-toolkit-go/toolkit/pitr/filename.go +++ b/dbm-services/mongo/db-tools/mongo-toolkit-go/toolkit/pitr/filename.go @@ -211,7 +211,7 @@ func DecodeFileV0INCR(filename string) (*BackupFileName, error) { return nil, fmt.Errorf("bad format: LastTs (%s %s)err:%v", fields[fn-1], "0", err) } - if vv, err := strconv.ParseUint(fields[fn-2], 10, 64); err != nil { + if vv, err := strconv.ParseUint(fields[fn-2], 10, 32); err != nil { return nil, fmt.Errorf("bad format: V0IncrSeq:%v", err) } else { bfn.V0IncrSeq = uint32(vv) diff --git a/dbm-services/mongo/db-tools/mongo-toolkit-go/toolkit/pitr/metav2.go b/dbm-services/mongo/db-tools/mongo-toolkit-go/toolkit/pitr/metav2.go index ad02e46d1f..bcb55314f5 100644 --- a/dbm-services/mongo/db-tools/mongo-toolkit-go/toolkit/pitr/metav2.go +++ b/dbm-services/mongo/db-tools/mongo-toolkit-go/toolkit/pitr/metav2.go @@ -206,20 +206,25 @@ func (b *BackupMetaV2) GetMetaFileName() string { return path.Join(b.MetaDir, MetaFileName) } +func parseUint32(s string) (uint32, error) { + v, err := strconv.ParseUint(s, 10, 32) + return uint32(v), err + +} func splitTs(ts string) (*TS, error) { fs := strings.Split(ts, "|") if len(fs) != 2 { return nil, fmt.Errorf("splitTs failed, ts: %s", ts) } - ts1, err := strconv.ParseInt(fs[0], 10, 64) + ts1, err := parseUint32(fs[0]) if err != nil { return nil, fmt.Errorf("splitTs failed, ts: %s, err: %v", ts, err) } - ts2, err := strconv.ParseInt(fs[1], 10, 64) + ts2, err := parseUint32(fs[1]) if err != nil { return nil, fmt.Errorf("splitTs failed, ts: %s, err: %v", ts, err) } - return &TS{uint32(ts1), uint32(ts2)}, nil + return &TS{ts1, ts2}, nil } func parseOplogPosLine(line string) (row *BackupFileName, err error) {