diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index aae1435d1..35b5847ad 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "net" + "net/url" "os" "strconv" "strings" @@ -54,7 +55,7 @@ type kubeUnit struct { } // kubeExtraData is the content of the ExtraData JSON field for a Kubernetes worker. -type kubeExtraData struct { +type KubeExtraData struct { Image string Command string Params string @@ -64,6 +65,109 @@ type kubeExtraData struct { PodName string } +type KubeAPIer interface { + NewNotFound(qualifiedResource schema.GroupResource, name string) *apierrors.StatusError + OneTermEqualSelector(k string, v string) fields.Selector + NewForConfig(c *rest.Config) (*kubernetes.Clientset, error) + GetLogs(clientset *kubernetes.Clientset, namespace string, name string, opts *corev1.PodLogOptions) *rest.Request + Get(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts metav1.GetOptions) (*corev1.Pod, error) + Create(clientset *kubernetes.Clientset, namespace string, ctx context.Context, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error) + List(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error) + Watch(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) + Delete(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts metav1.DeleteOptions) error + SubResource(clientset *kubernetes.Clientset, podName string, podNamespace string) *rest.Request + InClusterConfig() (*rest.Config, error) + NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules + BuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*rest.Config, error) + NewClientConfigFromBytes(configBytes []byte) (clientcmd.ClientConfig, error) + NewSPDYExecutor(config *rest.Config, method string, url *url.URL) (remotecommand.Executor, error) + StreamWithContext(exec remotecommand.Executor, ctx context.Context, options remotecommand.StreamOptions) error + UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition watch2.PreconditionFunc, conditions ...watch2.ConditionFunc) (*watch.Event, error) + NewFakeNeverRateLimiter() flowcontrol.RateLimiter + NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter +} + +type KubeAPIWrapper struct { +} + +func (ku KubeAPIWrapper) NewNotFound(qualifiedResource schema.GroupResource, name string) *apierrors.StatusError { + return apierrors.NewNotFound(qualifiedResource, name) +} + +func (ku KubeAPIWrapper) OneTermEqualSelector(k string, v string) fields.Selector { + return fields.OneTermEqualSelector(k, v) +} + +func (ku KubeAPIWrapper) NewForConfig(c *rest.Config) (*kubernetes.Clientset, error) { + return kubernetes.NewForConfig(c) +} + +func (ku KubeAPIWrapper) GetLogs(clientset *kubernetes.Clientset, namespace string, name string, opts *corev1.PodLogOptions) *rest.Request { + return clientset.CoreV1().Pods(namespace).GetLogs(name, opts) +} + +func (ku KubeAPIWrapper) Get(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts metav1.GetOptions) (*corev1.Pod, error) { + return clientset.CoreV1().Pods(namespace).Get(ctx, name, opts) +} + +func (ku KubeAPIWrapper) Create(clientset *kubernetes.Clientset, namespace string, ctx context.Context, pod *corev1.Pod, opts metav1.CreateOptions) (*corev1.Pod, error) { + return clientset.CoreV1().Pods(namespace).Create(ctx, pod, opts) +} + +func (ku KubeAPIWrapper) List(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error) { + return clientset.CoreV1().Pods(namespace).List(ctx, opts) +} + +func (ku KubeAPIWrapper) Watch(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return clientset.CoreV1().Pods(namespace).Watch(ctx, opts) +} + +func (ku KubeAPIWrapper) Delete(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts metav1.DeleteOptions) error { + return clientset.CoreV1().Pods(namespace).Delete(ctx, name, opts) +} + +func (ku KubeAPIWrapper) SubResource(clientset *kubernetes.Clientset, podName string, podNamespace string) *rest.Request { + return clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(podNamespace).SubResource("attach") +} + +func (ku KubeAPIWrapper) InClusterConfig() (*rest.Config, error) { + return rest.InClusterConfig() +} + +func (ku KubeAPIWrapper) NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules { + return clientcmd.NewDefaultClientConfigLoadingRules() +} + +func (ku KubeAPIWrapper) BuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*rest.Config, error) { + return clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath) +} + +func (ku KubeAPIWrapper) NewClientConfigFromBytes(configBytes []byte) (clientcmd.ClientConfig, error) { + return clientcmd.NewClientConfigFromBytes(configBytes) +} + +func (ku KubeAPIWrapper) NewSPDYExecutor(config *rest.Config, method string, url *url.URL) (remotecommand.Executor, error) { + return remotecommand.NewSPDYExecutor(config, method, url) +} + +func (ku KubeAPIWrapper) StreamWithContext(exec remotecommand.Executor, ctx context.Context, options remotecommand.StreamOptions) error { + return exec.StreamWithContext(ctx, options) +} + +func (ku KubeAPIWrapper) UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition watch2.PreconditionFunc, conditions ...watch2.ConditionFunc) (*watch.Event, error) { + return watch2.UntilWithSync(ctx, lw, objType, precondition, conditions...) +} + +func (ku KubeAPIWrapper) NewFakeNeverRateLimiter() flowcontrol.RateLimiter { + return flowcontrol.NewFakeNeverRateLimiter() +} + +func (ku KubeAPIWrapper) NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter { + return flowcontrol.NewFakeAlwaysRateLimiter() +} + +var KubeAPIWrapperInstance KubeAPIer + // ErrPodCompleted is returned when pod has already completed before we could attach. var ErrPodCompleted = fmt.Errorf("pod ran to completion") @@ -78,7 +182,7 @@ func podRunningAndReady() func(event watch.Event) (bool, error) { imagePullBackOffRetries := 3 inner := func(event watch.Event) (bool, error) { if event.Type == watch.Deleted { - return false, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, "") + return false, KubeAPIWrapperInstance.NewNotFound(schema.GroupResource{Resource: "pods"}, "") } if t, ok := event.Object.(*corev1.Pod); ok { switch t.Status.Phase { @@ -134,9 +238,7 @@ func (kw *kubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time podOptions.SinceTime = &metav1.Time{Time: sinceTime} } - logReq := kw.clientset.CoreV1().Pods(podNamespace).GetLogs( - podName, podOptions, - ) + logReq := KubeAPIWrapperInstance.GetLogs(kw.clientset, podNamespace, podName, podOptions) // get logstream, with retry for retries := 5; retries > 0; retries-- { logStream, err = logReq.Stream(kw.GetContext()) @@ -207,7 +309,7 @@ func (kw *kubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout // get pod, with retry for retries := 5; retries > 0; retries-- { - kw.pod, err = kw.clientset.CoreV1().Pods(podNamespace).Get(kw.GetContext(), podName, metav1.GetOptions{}) + kw.pod, err = KubeAPIWrapperInstance.Get(kw.clientset, podNamespace, kw.GetContext(), podName, metav1.GetOptions{}) if err == nil { break } @@ -278,7 +380,7 @@ func (kw *kubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout } split := strings.SplitN(line, " ", 2) - timeStamp := parseTime(split[0]) + timeStamp := ParseTime(split[0]) if !timeStamp.After(sinceTime) && !successfulWrite { continue } @@ -301,7 +403,7 @@ func (kw *kubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout } func (kw *kubeUnit) createPod(env map[string]string) error { - ked := kw.UnredactedStatus().ExtraData.(*kubeExtraData) + ked := kw.UnredactedStatus().ExtraData.(*KubeExtraData) command, err := shlex.Split(ked.Command) if err != nil { return err @@ -383,7 +485,7 @@ func (kw *kubeUnit) createPod(env map[string]string) error { } // get pod and store to kw.pod - kw.pod, err = kw.clientset.CoreV1().Pods(ked.KubeNamespace).Create(kw.GetContext(), pod, metav1.CreateOptions{}) + kw.pod, err = KubeAPIWrapperInstance.Create(kw.clientset, ked.KubeNamespace, kw.GetContext(), pod, metav1.CreateOptions{}) if err != nil { return err } @@ -398,21 +500,21 @@ func (kw *kubeUnit) createPod(env map[string]string) error { status.State = WorkStatePending status.Detail = "Pod created" status.StdoutSize = 0 - status.ExtraData.(*kubeExtraData).PodName = kw.pod.Name + status.ExtraData.(*KubeExtraData).PodName = kw.pod.Name }) // Wait for the pod to be running - fieldSelector := fields.OneTermEqualSelector("metadata.name", kw.pod.Name).String() + fieldSelector := KubeAPIWrapperInstance.OneTermEqualSelector("metadata.name", kw.pod.Name).String() lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector - return kw.clientset.CoreV1().Pods(ked.KubeNamespace).List(kw.GetContext(), options) + return KubeAPIWrapperInstance.List(kw.clientset, ked.KubeNamespace, kw.GetContext(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.FieldSelector = fieldSelector - return kw.clientset.CoreV1().Pods(ked.KubeNamespace).Watch(kw.GetContext(), options) + return KubeAPIWrapperInstance.Watch(kw.clientset, ked.KubeNamespace, kw.GetContext(), options) }, } @@ -422,7 +524,7 @@ func (kw *kubeUnit) createPod(env map[string]string) error { } time.Sleep(2 * time.Second) - ev, err := watch2.UntilWithSync(ctxPodReady, lw, &corev1.Pod{}, nil, podRunningAndReady()) + ev, err := KubeAPIWrapperInstance.UntilWithSync(ctxPodReady, lw, &corev1.Pod{}, nil, podRunningAndReady()) if ev == nil || ev.Object == nil { return fmt.Errorf("did not return an event while watching pod for work unit %s", kw.ID()) } @@ -491,7 +593,7 @@ func (kw *kubeUnit) runWorkUsingLogger() { skipStdin := true status := kw.Status() - ked := status.ExtraData.(*kubeExtraData) + ked := status.ExtraData.(*KubeExtraData) podName := ked.PodName podNamespace := ked.KubeNamespace @@ -538,7 +640,7 @@ func (kw *kubeUnit) runWorkUsingLogger() { default: } - kw.pod, err = kw.clientset.CoreV1().Pods(podNamespace).Get(kw.GetContext(), podName, metav1.GetOptions{}) + kw.pod, err = KubeAPIWrapperInstance.Get(kw.clientset, podNamespace, kw.GetContext(), podName, metav1.GetOptions{}) if err == nil { break } @@ -563,11 +665,7 @@ func (kw *kubeUnit) runWorkUsingLogger() { // Attach stdin stream to the pod var exec remotecommand.Executor if !skipStdin { - req := kw.clientset.CoreV1().RESTClient().Post(). - Resource("pods"). - Name(podName). - Namespace(podNamespace). - SubResource("attach") + req := KubeAPIWrapperInstance.SubResource(kw.clientset, podName, podNamespace) req.VersionedParams( &corev1.PodExecOptions{ @@ -579,9 +677,8 @@ func (kw *kubeUnit) runWorkUsingLogger() { }, scheme.ParameterCodec, ) - var err error - exec, err = remotecommand.NewSPDYExecutor(kw.config, "POST", req.URL()) + exec, err = KubeAPIWrapperInstance.NewSPDYExecutor(kw.config, "POST", req.URL()) if err != nil { errMsg := fmt.Sprintf("Error creating SPDY executor: %s", err) kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) @@ -675,7 +772,7 @@ func (kw *kubeUnit) runWorkUsingLogger() { var err error for retries := 5; retries > 0; retries-- { - err = exec.StreamWithContext(kw.GetContext(), remotecommand.StreamOptions{ + err = KubeAPIWrapperInstance.StreamWithContext(exec, kw.GetContext(), remotecommand.StreamOptions{ Stdin: stdin, Tty: false, }) @@ -721,7 +818,7 @@ func (kw *kubeUnit) runWorkUsingLogger() { }() } - stdoutWithReconnect := shouldUseReconnect() + stdoutWithReconnect := ShouldUseReconnect() if stdoutWithReconnect && stdoutErr == nil { kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with reconnect support") go kw.kubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr) @@ -757,7 +854,7 @@ func (kw *kubeUnit) runWorkUsingLogger() { } } -func shouldUseReconnect() bool { +func ShouldUseReconnect() bool { // Support for streaming from pod with timestamps using reconnect method is in all current versions // Can override the detection by setting the RECEPTOR_KUBE_SUPPORT_RECONNECT // accepted values: "enabled", "disabled", "auto". The default is "enabled" @@ -780,7 +877,7 @@ func shouldUseReconnect() bool { return true } -func parseTime(s string) *time.Time { +func ParseTime(s string) *time.Time { t, err := time.Parse(time.RFC3339, s) if err == nil { return &t @@ -973,10 +1070,10 @@ func (kw *kubeUnit) runWorkUsingTCP() { func (kw *kubeUnit) connectUsingKubeconfig() error { var err error - ked := kw.UnredactedStatus().ExtraData.(*kubeExtraData) + ked := kw.UnredactedStatus().ExtraData.(*KubeExtraData) if ked.KubeConfig == "" { - clr := clientcmd.NewDefaultClientConfigLoadingRules() - kw.config, err = clientcmd.BuildConfigFromFlags("", clr.GetDefaultFilename()) + clr := KubeAPIWrapperInstance.NewDefaultClientConfigLoadingRules() + kw.config, err = KubeAPIWrapperInstance.BuildConfigFromFlags("", clr.GetDefaultFilename()) if ked.KubeNamespace == "" { c, err := clr.Load() if err != nil { @@ -985,14 +1082,14 @@ func (kw *kubeUnit) connectUsingKubeconfig() error { curContext, ok := c.Contexts[c.CurrentContext] if ok && curContext != nil { kw.UpdateFullStatus(func(sfd *StatusFileData) { - sfd.ExtraData.(*kubeExtraData).KubeNamespace = curContext.Namespace + sfd.ExtraData.(*KubeExtraData).KubeNamespace = curContext.Namespace }) } else { return fmt.Errorf("could not determine namespace") } } } else { - cfg, err := clientcmd.NewClientConfigFromBytes([]byte(ked.KubeConfig)) + cfg, err := KubeAPIWrapperInstance.NewClientConfigFromBytes([]byte(ked.KubeConfig)) if err != nil { return err } @@ -1002,7 +1099,7 @@ func (kw *kubeUnit) connectUsingKubeconfig() error { return err } kw.UpdateFullStatus(func(sfd *StatusFileData) { - sfd.ExtraData.(*kubeExtraData).KubeNamespace = namespace + sfd.ExtraData.(*KubeExtraData).KubeNamespace = namespace }) } kw.config, err = cfg.ClientConfig() @@ -1019,7 +1116,7 @@ func (kw *kubeUnit) connectUsingKubeconfig() error { func (kw *kubeUnit) connectUsingIncluster() error { var err error - kw.config, err = rest.InClusterConfig() + kw.config, err = KubeAPIWrapperInstance.InClusterConfig() if err != nil { return err } @@ -1082,16 +1179,16 @@ func (kw *kubeUnit) connectToKube() error { if ok { switch envRateLimiter { case "never": - kw.config.RateLimiter = flowcontrol.NewFakeNeverRateLimiter() + kw.config.RateLimiter = KubeAPIWrapperInstance.NewFakeNeverRateLimiter() case "always": - kw.config.RateLimiter = flowcontrol.NewFakeAlwaysRateLimiter() + kw.config.RateLimiter = KubeAPIWrapperInstance.NewFakeAlwaysRateLimiter() default: } kw.GetWorkceptor().nc.GetLogger().Debug("RateLimiter: %s", envRateLimiter) } kw.GetWorkceptor().nc.GetLogger().Debug("QPS: %f, Burst: %d", kw.config.QPS, kw.config.Burst) - kw.clientset, err = kubernetes.NewForConfig(kw.config) + kw.clientset, err = KubeAPIWrapperInstance.NewForConfig(kw.config) if err != nil { return err } @@ -1114,7 +1211,7 @@ func readFileToString(filename string) (string, error) { // SetFromParams sets the in-memory state from parameters. func (kw *kubeUnit) SetFromParams(params map[string]string) error { - ked := kw.GetStatusCopy().ExtraData.(*kubeExtraData) + ked := kw.GetStatusCopy().ExtraData.(*KubeExtraData) type value struct { name string permission bool @@ -1203,7 +1300,7 @@ func (kw *kubeUnit) SetFromParams(params map[string]string) error { // Status returns a copy of the status currently loaded in memory. func (kw *kubeUnit) Status() *StatusFileData { status := kw.UnredactedStatus() - ed, ok := status.ExtraData.(*kubeExtraData) + ed, ok := status.ExtraData.(*KubeExtraData) if ok { ed.KubeConfig = "" ed.KubePod = "" @@ -1217,7 +1314,7 @@ func (kw *kubeUnit) UnredactedStatus() *StatusFileData { kw.GetStatusLock().RLock() defer kw.GetStatusLock().RUnlock() status := kw.GetStatusWithoutExtraData() - ked, ok := kw.GetStatusCopy().ExtraData.(*kubeExtraData) + ked, ok := kw.GetStatusCopy().ExtraData.(*KubeExtraData) if ok { kedCopy := *ked status.ExtraData = &kedCopy @@ -1246,7 +1343,7 @@ func (kw *kubeUnit) startOrRestart() error { // Restart resumes monitoring a job after a Receptor restart. func (kw *kubeUnit) Restart() error { status := kw.Status() - ked := status.ExtraData.(*kubeExtraData) + ked := status.ExtraData.(*KubeExtraData) if IsComplete(status.State) { return nil } @@ -1260,7 +1357,7 @@ func (kw *kubeUnit) Restart() error { if err != nil { kw.GetWorkceptor().nc.GetLogger().Warning("Pod %s could not be deleted: %s", ked.PodName, err.Error()) } else { - err := kw.clientset.CoreV1().Pods(ked.KubeNamespace).Delete(context.Background(), ked.PodName, metav1.DeleteOptions{}) + err := KubeAPIWrapperInstance.Delete(kw.clientset, ked.KubeNamespace, context.Background(), ked.PodName, metav1.DeleteOptions{}) if err != nil { kw.GetWorkceptor().nc.GetLogger().Warning("Pod %s could not be deleted: %s", ked.PodName, err.Error()) } @@ -1285,7 +1382,7 @@ func (kw *kubeUnit) Cancel() error { kw.CancelContext() kw.UpdateBasicStatus(WorkStateCanceled, "Canceled", -1) if kw.pod != nil { - err := kw.clientset.CoreV1().Pods(kw.pod.Namespace).Delete(context.Background(), kw.pod.Name, metav1.DeleteOptions{}) + err := KubeAPIWrapperInstance.Delete(kw.clientset, kw.pod.Namespace, context.Background(), kw.pod.Name, metav1.DeleteOptions{}) if err != nil { kw.GetWorkceptor().nc.GetLogger().Error("Error deleting pod %s: %s", kw.pod.Name, err) } @@ -1332,10 +1429,14 @@ type KubeWorkerCfg struct { // NewWorker is a factory to produce worker instances. func (cfg KubeWorkerCfg) NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string) WorkUnit { + return cfg.NewkubeWorker(bwu, w, unitID, workType, nil) +} + +func (cfg KubeWorkerCfg) NewkubeWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, unitID string, workType string, kawi KubeAPIer) WorkUnit { if bwu == nil { bwu = &BaseWorkUnit{ status: StatusFileData{ - ExtraData: &kubeExtraData{ + ExtraData: &KubeExtraData{ Image: cfg.Image, Command: cfg.Command, KubeNamespace: cfg.Namespace, @@ -1346,6 +1447,11 @@ func (cfg KubeWorkerCfg) NewWorker(bwu BaseWorkUnitForWorkUnit, w *Workceptor, u } } + KubeAPIWrapperInstance = KubeAPIWrapper{} + if kawi != nil { + KubeAPIWrapperInstance = kawi + } + ku := &kubeUnit{ BaseWorkUnitForWorkUnit: bwu, authMethod: strings.ToLower(cfg.AuthMethod), diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index 23500ede9..f7eb4dea2 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -1,10 +1,26 @@ -package workceptor +package workceptor_test import ( + "context" "os" "reflect" + "sync" "testing" "time" + + "github.com/ansible/receptor/pkg/logger" + "github.com/ansible/receptor/pkg/workceptor" + "github.com/ansible/receptor/pkg/workceptor/mock_workceptor" + "github.com/golang/mock/gomock" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" ) func TestShouldUseReconnect(t *testing.T) { @@ -50,7 +66,7 @@ func TestShouldUseReconnect(t *testing.T) { os.Unsetenv(envVariable) } - if got := shouldUseReconnect(); got != tt.want { + if got := workceptor.ShouldUseReconnect(); got != tt.want { t.Errorf("shouldUseReconnect() = %v, want %v", got, tt.want) } }) @@ -88,9 +104,114 @@ func TestParseTime(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := parseTime(tt.args.s); !reflect.DeepEqual(got, tt.want) { + if got := workceptor.ParseTime(tt.args.s); !reflect.DeepEqual(got, tt.want) { t.Errorf("parseTime() = %v, want %v", got, tt.want) } }) } } + +func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workceptor.MockBaseWorkUnitForWorkUnit, *mock_workceptor.MockNetceptorForWorkceptor, *workceptor.Workceptor, *mock_workceptor.MockKubeAPIer, context.Context) { + ctrl := gomock.NewController(t) + ctx := context.Background() + + mockBaseWorkUnit := mock_workceptor.NewMockBaseWorkUnitForWorkUnit(ctrl) + mockNetceptor := mock_workceptor.NewMockNetceptorForWorkceptor(ctrl) + mockNetceptor.EXPECT().NodeID().Return("NodeID") + mockKubeAPI := mock_workceptor.NewMockKubeAPIer(ctrl) + + w, err := workceptor.New(ctx, mockNetceptor, "/tmp") + if err != nil { + t.Errorf("Error while creating Workceptor: %v", err) + } + + mockBaseWorkUnit.EXPECT().Init(w, "", "", workceptor.FileSystem{}, nil) + kubeConfig := workceptor.KubeWorkerCfg{AuthMethod: "incluster"} + ku := kubeConfig.NewkubeWorker(mockBaseWorkUnit, w, "", "", mockKubeAPI) + + return ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctx +} + +type hasTerm struct { + field, value string +} + +func (h *hasTerm) DeepCopySelector() fields.Selector { return h } +func (h *hasTerm) Empty() bool { return true } +func (h *hasTerm) Matches(ls fields.Fields) bool { return true } +func (h *hasTerm) Requirements() fields.Requirements { + return []fields.Requirement{{ + Field: h.field, + Operator: selection.Equals, + Value: h.value, + }} +} +func (h *hasTerm) RequiresExactMatch(field string) (value string, found bool) { return "", true } +func (h *hasTerm) String() string { return "Test" } +func (h *hasTerm) Transform(fn fields.TransformFunc) (fields.Selector, error) { return h, nil } + +type ex struct { +} + +func (e *ex) Stream(options remotecommand.StreamOptions) error { + return nil +} + +func (e *ex) StreamWithContext(ctx context.Context, options remotecommand.StreamOptions) error { + return nil +} + +func TestKubeStart(t *testing.T) { + ku, mockbwu, mockNet, w, mockKubeAPI, ctx := createKubernetesTestSetup(t) + + startTestCases := []struct { + name string + }{ + {name: "test1"}, + } + + for _, testCase := range startTestCases { + t.Run(testCase.name, func(t *testing.T) { + mockbwu.EXPECT().UpdateBasicStatus(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + config := rest.Config{} + mockKubeAPI.EXPECT().InClusterConfig().Return(&config, nil) + mockbwu.EXPECT().GetWorkceptor().Return(w).AnyTimes() + logger := logger.NewReceptorLogger("") + mockNet.EXPECT().GetLogger().Return(logger).AnyTimes() + clientset := kubernetes.Clientset{} + mockKubeAPI.EXPECT().NewForConfig(gomock.Any()).Return(&clientset, nil) + mockbwu.EXPECT().MonitorLocalStatus().AnyTimes() + lock := &sync.RWMutex{} + mockbwu.EXPECT().GetStatusLock().Return(lock).AnyTimes() + kubeExtraData := workceptor.KubeExtraData{} + status := workceptor.StatusFileData{ExtraData: &kubeExtraData} + mockbwu.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes() + mockbwu.EXPECT().GetStatusCopy().Return(status).AnyTimes() + mockbwu.EXPECT().GetContext().Return(ctx).AnyTimes() + pod := corev1.Pod{metav1.TypeMeta{}, metav1.ObjectMeta{Name: "Test Name"}, corev1.PodSpec{}, corev1.PodStatus{}} + + mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes() + mockbwu.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes() + + field := hasTerm{} + mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes() + ev := watch.Event{Object: &pod} + mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes() + apierr := apierrors.StatusError{} + mockKubeAPI.EXPECT().NewNotFound(gomock.Any(), gomock.Any()).Return(&apierr).AnyTimes() + mockbwu.EXPECT().MonitorLocalStatus().AnyTimes() + + c := rest.RESTClient{} + req := rest.NewRequest(&c) + mockKubeAPI.EXPECT().SubResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(req).AnyTimes() + exec := ex{} + mockKubeAPI.EXPECT().NewSPDYExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&exec, nil).AnyTimes() + mockbwu.EXPECT().UnitDir().Return("TestDir").AnyTimes() + + err := ku.Start() + if err != nil { + t.Error(err) + } + }) + } +} diff --git a/pkg/workceptor/mock_workceptor/kubernetes.go b/pkg/workceptor/mock_workceptor/kubernetes.go new file mode 100644 index 000000000..82c93d6af --- /dev/null +++ b/pkg/workceptor/mock_workceptor/kubernetes.go @@ -0,0 +1,330 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/workceptor/kubernetes.go + +// Package mock_workceptor is a generated GoMock package. +package mock_workceptor + +import ( + context "context" + gomock "github.com/golang/mock/gomock" + v1 "k8s.io/api/core/v1" + errors "k8s.io/apimachinery/pkg/api/errors" + v10 "k8s.io/apimachinery/pkg/apis/meta/v1" + fields "k8s.io/apimachinery/pkg/fields" + runtime "k8s.io/apimachinery/pkg/runtime" + schema "k8s.io/apimachinery/pkg/runtime/schema" + watch "k8s.io/apimachinery/pkg/watch" + kubernetes "k8s.io/client-go/kubernetes" + rest "k8s.io/client-go/rest" + cache "k8s.io/client-go/tools/cache" + clientcmd "k8s.io/client-go/tools/clientcmd" + remotecommand "k8s.io/client-go/tools/remotecommand" + watch0 "k8s.io/client-go/tools/watch" + flowcontrol "k8s.io/client-go/util/flowcontrol" + url "net/url" + reflect "reflect" +) + +// MockKubeAPIer is a mock of KubeAPIer interface +type MockKubeAPIer struct { + ctrl *gomock.Controller + recorder *MockKubeAPIerMockRecorder +} + +// MockKubeAPIerMockRecorder is the mock recorder for MockKubeAPIer +type MockKubeAPIerMockRecorder struct { + mock *MockKubeAPIer +} + +// NewMockKubeAPIer creates a new mock instance +func NewMockKubeAPIer(ctrl *gomock.Controller) *MockKubeAPIer { + mock := &MockKubeAPIer{ctrl: ctrl} + mock.recorder = &MockKubeAPIerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockKubeAPIer) EXPECT() *MockKubeAPIerMockRecorder { + return m.recorder +} + +// NewNotFound mocks base method +func (m *MockKubeAPIer) NewNotFound(qualifiedResource schema.GroupResource, name string) *errors.StatusError { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewNotFound", qualifiedResource, name) + ret0, _ := ret[0].(*errors.StatusError) + return ret0 +} + +// NewNotFound indicates an expected call of NewNotFound +func (mr *MockKubeAPIerMockRecorder) NewNotFound(qualifiedResource, name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewNotFound", reflect.TypeOf((*MockKubeAPIer)(nil).NewNotFound), qualifiedResource, name) +} + +// OneTermEqualSelector mocks base method +func (m *MockKubeAPIer) OneTermEqualSelector(k, v string) fields.Selector { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OneTermEqualSelector", k, v) + ret0, _ := ret[0].(fields.Selector) + return ret0 +} + +// OneTermEqualSelector indicates an expected call of OneTermEqualSelector +func (mr *MockKubeAPIerMockRecorder) OneTermEqualSelector(k, v interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OneTermEqualSelector", reflect.TypeOf((*MockKubeAPIer)(nil).OneTermEqualSelector), k, v) +} + +// NewForConfig mocks base method +func (m *MockKubeAPIer) NewForConfig(c *rest.Config) (*kubernetes.Clientset, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewForConfig", c) + ret0, _ := ret[0].(*kubernetes.Clientset) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewForConfig indicates an expected call of NewForConfig +func (mr *MockKubeAPIerMockRecorder) NewForConfig(c interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewForConfig", reflect.TypeOf((*MockKubeAPIer)(nil).NewForConfig), c) +} + +// GetLogs mocks base method +func (m *MockKubeAPIer) GetLogs(clientset *kubernetes.Clientset, namespace, name string, opts *v1.PodLogOptions) *rest.Request { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLogs", clientset, namespace, name, opts) + ret0, _ := ret[0].(*rest.Request) + return ret0 +} + +// GetLogs indicates an expected call of GetLogs +func (mr *MockKubeAPIerMockRecorder) GetLogs(clientset, namespace, name, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLogs", reflect.TypeOf((*MockKubeAPIer)(nil).GetLogs), clientset, namespace, name, opts) +} + +// Get mocks base method +func (m *MockKubeAPIer) Get(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts v10.GetOptions) (*v1.Pod, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", clientset, namespace, ctx, name, opts) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get +func (mr *MockKubeAPIerMockRecorder) Get(clientset, namespace, ctx, name, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockKubeAPIer)(nil).Get), clientset, namespace, ctx, name, opts) +} + +// Create mocks base method +func (m *MockKubeAPIer) Create(clientset *kubernetes.Clientset, namespace string, ctx context.Context, pod *v1.Pod, opts v10.CreateOptions) (*v1.Pod, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", clientset, namespace, ctx, pod, opts) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Create indicates an expected call of Create +func (mr *MockKubeAPIerMockRecorder) Create(clientset, namespace, ctx, pod, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockKubeAPIer)(nil).Create), clientset, namespace, ctx, pod, opts) +} + +// List mocks base method +func (m *MockKubeAPIer) List(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts v10.ListOptions) (*v1.PodList, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "List", clientset, namespace, ctx, opts) + ret0, _ := ret[0].(*v1.PodList) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// List indicates an expected call of List +func (mr *MockKubeAPIerMockRecorder) List(clientset, namespace, ctx, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockKubeAPIer)(nil).List), clientset, namespace, ctx, opts) +} + +// Watch mocks base method +func (m *MockKubeAPIer) Watch(clientset *kubernetes.Clientset, namespace string, ctx context.Context, opts v10.ListOptions) (watch.Interface, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Watch", clientset, namespace, ctx, opts) + ret0, _ := ret[0].(watch.Interface) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Watch indicates an expected call of Watch +func (mr *MockKubeAPIerMockRecorder) Watch(clientset, namespace, ctx, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockKubeAPIer)(nil).Watch), clientset, namespace, ctx, opts) +} + +// Delete mocks base method +func (m *MockKubeAPIer) Delete(clientset *kubernetes.Clientset, namespace string, ctx context.Context, name string, opts v10.DeleteOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", clientset, namespace, ctx, name, opts) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete +func (mr *MockKubeAPIerMockRecorder) Delete(clientset, namespace, ctx, name, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockKubeAPIer)(nil).Delete), clientset, namespace, ctx, name, opts) +} + +// SubResource mocks base method +func (m *MockKubeAPIer) SubResource(clientset *kubernetes.Clientset, podName, podNamespace string) *rest.Request { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubResource", clientset, podName, podNamespace) + ret0, _ := ret[0].(*rest.Request) + return ret0 +} + +// SubResource indicates an expected call of SubResource +func (mr *MockKubeAPIerMockRecorder) SubResource(clientset, podName, podNamespace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubResource", reflect.TypeOf((*MockKubeAPIer)(nil).SubResource), clientset, podName, podNamespace) +} + +// InClusterConfig mocks base method +func (m *MockKubeAPIer) InClusterConfig() (*rest.Config, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InClusterConfig") + ret0, _ := ret[0].(*rest.Config) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// InClusterConfig indicates an expected call of InClusterConfig +func (mr *MockKubeAPIerMockRecorder) InClusterConfig() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InClusterConfig", reflect.TypeOf((*MockKubeAPIer)(nil).InClusterConfig)) +} + +// NewDefaultClientConfigLoadingRules mocks base method +func (m *MockKubeAPIer) NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewDefaultClientConfigLoadingRules") + ret0, _ := ret[0].(*clientcmd.ClientConfigLoadingRules) + return ret0 +} + +// NewDefaultClientConfigLoadingRules indicates an expected call of NewDefaultClientConfigLoadingRules +func (mr *MockKubeAPIerMockRecorder) NewDefaultClientConfigLoadingRules() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDefaultClientConfigLoadingRules", reflect.TypeOf((*MockKubeAPIer)(nil).NewDefaultClientConfigLoadingRules)) +} + +// BuildConfigFromFlags mocks base method +func (m *MockKubeAPIer) BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*rest.Config, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BuildConfigFromFlags", masterUrl, kubeconfigPath) + ret0, _ := ret[0].(*rest.Config) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BuildConfigFromFlags indicates an expected call of BuildConfigFromFlags +func (mr *MockKubeAPIerMockRecorder) BuildConfigFromFlags(masterUrl, kubeconfigPath interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildConfigFromFlags", reflect.TypeOf((*MockKubeAPIer)(nil).BuildConfigFromFlags), masterUrl, kubeconfigPath) +} + +// NewClientConfigFromBytes mocks base method +func (m *MockKubeAPIer) NewClientConfigFromBytes(configBytes []byte) (clientcmd.ClientConfig, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewClientConfigFromBytes", configBytes) + ret0, _ := ret[0].(clientcmd.ClientConfig) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewClientConfigFromBytes indicates an expected call of NewClientConfigFromBytes +func (mr *MockKubeAPIerMockRecorder) NewClientConfigFromBytes(configBytes interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewClientConfigFromBytes", reflect.TypeOf((*MockKubeAPIer)(nil).NewClientConfigFromBytes), configBytes) +} + +// NewSPDYExecutor mocks base method +func (m *MockKubeAPIer) NewSPDYExecutor(config *rest.Config, method string, url *url.URL) (remotecommand.Executor, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewSPDYExecutor", config, method, url) + ret0, _ := ret[0].(remotecommand.Executor) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewSPDYExecutor indicates an expected call of NewSPDYExecutor +func (mr *MockKubeAPIerMockRecorder) NewSPDYExecutor(config, method, url interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewSPDYExecutor", reflect.TypeOf((*MockKubeAPIer)(nil).NewSPDYExecutor), config, method, url) +} + +// StreamWithContext mocks base method +func (m *MockKubeAPIer) StreamWithContext(exec remotecommand.Executor, ctx context.Context, options remotecommand.StreamOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StreamWithContext", exec, ctx, options) + ret0, _ := ret[0].(error) + return ret0 +} + +// StreamWithContext indicates an expected call of StreamWithContext +func (mr *MockKubeAPIerMockRecorder) StreamWithContext(exec, ctx, options interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamWithContext", reflect.TypeOf((*MockKubeAPIer)(nil).StreamWithContext), exec, ctx, options) +} + +// UntilWithSync mocks base method +func (m *MockKubeAPIer) UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition watch0.PreconditionFunc, conditions ...watch0.ConditionFunc) (*watch.Event, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, lw, objType, precondition} + for _, a := range conditions { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UntilWithSync", varargs...) + ret0, _ := ret[0].(*watch.Event) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UntilWithSync indicates an expected call of UntilWithSync +func (mr *MockKubeAPIerMockRecorder) UntilWithSync(ctx, lw, objType, precondition interface{}, conditions ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, lw, objType, precondition}, conditions...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UntilWithSync", reflect.TypeOf((*MockKubeAPIer)(nil).UntilWithSync), varargs...) +} + +// NewFakeNeverRateLimiter mocks base method +func (m *MockKubeAPIer) NewFakeNeverRateLimiter() flowcontrol.RateLimiter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewFakeNeverRateLimiter") + ret0, _ := ret[0].(flowcontrol.RateLimiter) + return ret0 +} + +// NewFakeNeverRateLimiter indicates an expected call of NewFakeNeverRateLimiter +func (mr *MockKubeAPIerMockRecorder) NewFakeNeverRateLimiter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewFakeNeverRateLimiter", reflect.TypeOf((*MockKubeAPIer)(nil).NewFakeNeverRateLimiter)) +} + +// NewFakeAlwaysRateLimiter mocks base method +func (m *MockKubeAPIer) NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewFakeAlwaysRateLimiter") + ret0, _ := ret[0].(flowcontrol.RateLimiter) + return ret0 +} + +// NewFakeAlwaysRateLimiter indicates an expected call of NewFakeAlwaysRateLimiter +func (mr *MockKubeAPIerMockRecorder) NewFakeAlwaysRateLimiter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewFakeAlwaysRateLimiter", reflect.TypeOf((*MockKubeAPIer)(nil).NewFakeAlwaysRateLimiter)) +}