Skip to content

Commit

Permalink
Merge pull request #202 from fanhaouu/refactor-optimize-logic
Browse files Browse the repository at this point in the history
optimize some code logic
  • Loading branch information
antmoveh authored Mar 27, 2024
2 parents 25ceb94 + cb3fc54 commit ec196cd
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 46 deletions.
46 changes: 31 additions & 15 deletions cmd/carina-node/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,32 @@ package run
import (
"context"
"errors"
"github.com/carina-io/carina"
"github.com/carina-io/carina/runners"
"os"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"os"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"time"

carinav1beta1 "github.com/carina-io/carina/api/v1beta1"

carinav1 "github.com/carina-io/carina/api/v1"
carinav1beta1 "github.com/carina-io/carina/api/v1beta1"
"github.com/carina-io/carina/controllers"
"github.com/carina-io/carina/pkg/csidriver/driver"
"github.com/carina-io/carina/pkg/csidriver/driver/k8s"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
carinaMetrics "github.com/carina-io/carina/pkg/metrics"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"github.com/carina-io/carina/runners"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -70,15 +72,29 @@ func subMain() error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: config.metricsAddr,
LeaderElection: false,
NewCache: cache.BuilderWithOptions(cache.Options{
Scheme: scheme,
SelectorsByObject: cache.SelectorsByObject{
&corev1.Node{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName}),
},
&corev1.Pod{}: {
Field: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}),
},
&carinav1beta1.NodeStorageResource{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName}),
},
},
}),
LeaderElection: false,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
return err
}

// 初始化磁盘管理服务
dm := deviceManager.NewDeviceManager(nodeName, mgr.GetCache(), mgr.GetClient())
dm := deviceManager.NewDeviceManager(nodeName, mgr.GetCache())

// pod io controller
podIOController := controllers.NewPodIOReconciler(
Expand Down
44 changes: 20 additions & 24 deletions pkg/csidriver/filesystem/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"

"github.com/carina-io/carina/utils/log"

"golang.org/x/sys/unix"
"k8s.io/utils/io"

"github.com/carina-io/carina/utils/log"
)

const (
Expand All @@ -44,9 +44,16 @@ func isSameDevice(dev1, dev2 string) (bool, error) {

var st1, st2 unix.Stat_t
if err := Stat(dev1, &st1); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("stat failed for %s: %v", dev1, err)
}

if err := Stat(dev2, &st2); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("stat failed for %s: %v", dev2, err)
}

Expand All @@ -60,12 +67,13 @@ func IsMounted(device, target string) (bool, error) {
if err != nil {
return false, err
}

target, err = filepath.EvalSymlinks(abs)
if err != nil {
return false, err
}

data, err := os.ReadFile("/proc/mounts")
data, err := io.ConsistentRead("/proc/mounts", 3)
if err != nil {
return false, fmt.Errorf("could not read /proc/mounts: %v", err)
}
Expand All @@ -75,41 +83,29 @@ func IsMounted(device, target string) (bool, error) {
if len(fields) < 2 {
continue
}
//Intercept characters to determine that they belong to the same pod
podstr, err := getOneStringByRegex(fields[1], `/pods/([\w-]+)/`)

// If the filesystem is nfs(cephfs、ussfs etc) and its connection is broken, EvalSymlinks will be stuck.
// So it should be in before calling EvalSymlinks.
ok, err := isSameDevice(device, fields[0])
if err != nil {
return false, fmt.Errorf("could not read pods mountpath %s : %v", fields[1], err)
return false, err
}
if !strings.Contains(target, podstr) {
if !ok {
continue
}

d, err := filepath.EvalSymlinks(fields[1])
if err != nil {
return false, err
}
if d == target {
return isSameDevice(device, fields[0])
return true, nil
}
}

return false, nil
}

func getOneStringByRegex(str, rule string) (string, error) {
if !strings.Contains(str, "/pods/") {
return "non-csi", nil
}
reg, err := regexp.Compile(rule)
if reg == nil || err != nil {
return "", fmt.Errorf("regexp compile:" + err.Error())
}
result := reg.FindStringSubmatch(str)
if len(result) < 1 {
return "", fmt.Errorf("could not find sub str: %v", str)
}
return result[1], nil
}

// DetectFilesystem returns filesystem type if device has a filesystem.
// This returns an empty string if no filesystem exists.
func DetectFilesystem(device string) (string, error) {
Expand Down
13 changes: 6 additions & 7 deletions pkg/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ package deviceManager

import (
"context"
"github.com/carina-io/carina/pkg/devicemanager/bcache"
"time"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/carina-io/carina/pkg/configuration"
"github.com/carina-io/carina/pkg/devicemanager/bcache"
"github.com/carina-io/carina/pkg/devicemanager/lvmd"
"github.com/carina-io/carina/pkg/devicemanager/partition"
"github.com/carina-io/carina/pkg/devicemanager/volume"
"github.com/carina-io/carina/utils/exec"
"github.com/carina-io/carina/utils/log"
"github.com/carina-io/carina/utils/mutx"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Trigger string
Expand All @@ -51,7 +52,6 @@ type VolumeEvent struct {

type DeviceManager struct {
Cache cache.Cache
client.Client
// Volume 操作
VolumeManager volume.LocalVolume
//磁盘以及分区操作
Expand All @@ -60,13 +60,12 @@ type DeviceManager struct {
noticeUpdates []chan *VolumeEvent
}

func NewDeviceManager(nodeName string, cache cache.Cache, client client.Client) *DeviceManager {
func NewDeviceManager(nodeName string, cache cache.Cache) *DeviceManager {
executor := &exec.CommandExecutor{}
mutex := mutx.NewGlobalLocks()

dm := DeviceManager{
Cache: cache,
Client: client,
VolumeManager: &volume.LocalVolumeImplement{Mutex: mutex, Lv: &lvmd.Lvm2Implement{Executor: executor}, Bcache: &bcache.BcacheImplement{Executor: executor}},
Partition: &partition.LocalPartitionImplement{Mutex: mutex, CacheParttionNum: make(map[string]uint), Executor: executor},
NodeName: nodeName,
Expand Down

0 comments on commit ec196cd

Please sign in to comment.