Skip to content

Commit

Permalink
not delete k8s pod in provider, add notify after handle biz in provider
Browse files Browse the repository at this point in the history
  • Loading branch information
lvjing2 committed Nov 12, 2024
1 parent a54e4d2 commit 0bb5e66
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 96 deletions.
4 changes: 2 additions & 2 deletions tunnel/mock_tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *MockTunnel) QueryAllBizStatusData(ctx context.Context, nodeID string) e
return nil
}

func (m *MockTunnel) StartContainer(ctx context.Context, nodeID, podKey string, container *corev1.Container) error {
func (m *MockTunnel) StartBiz(ctx context.Context, nodeID, podKey string, container *corev1.Container) error {
m.Lock()
defer m.Unlock()
key := utils.GetBizUniqueKey(container)
Expand All @@ -137,7 +137,7 @@ func (m *MockTunnel) StartContainer(ctx context.Context, nodeID, podKey string,
return nil
}

func (m *MockTunnel) ShutdownContainer(ctx context.Context, nodeID, podKey string, container *corev1.Container) error {
func (m *MockTunnel) StopBiz(ctx context.Context, nodeID, podKey string, container *corev1.Container) error {
m.Lock()
defer m.Unlock()
containerMap := m.bizStatusStorage[nodeID]
Expand Down
8 changes: 4 additions & 4 deletions tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ type Tunnel interface {
// QueryAllBizStatusData is the func call for vnode to fetch all containers status data , you need to fetch all containers status data and call OnAllBizStatusArrived when data arrived
QueryAllBizStatusData(ctx context.Context, nodeID string) error

// StartContainer is the func calls for vnode to start a container , you need to start container and call OnStartContainerResponseArrived when start complete with a response
StartContainer(ctx context.Context, nodeID, podKey string, container *v1.Container) error
// StartBiz is the func calls for vnode to start a biz instance, you need to start container and call OnStartBizResponseArrived when start complete with a response
StartBiz(ctx context.Context, nodeID, podKey string, container *v1.Container) error

// ShutdownContainer is the func calls for vnode to shut down a container , you need to start to shut down container and call OnShutdownContainerResponseArrived when shut down process complete with a response
ShutdownContainer(ctx context.Context, nodeID, podKey string, container *v1.Container) error
// StopBiz is the func calls for vnode to shut down a container , you need to start to shut down container and call OnShutdownContainerResponseArrived when shut down process complete with a response
StopBiz(ctx context.Context, nodeID, podKey string, container *v1.Container) error

// GetBizUniqueKey is the func returns a unique key of a container in a pod, vnode will use this unique key to find target Container status
GetBizUniqueKey(container *v1.Container) string
Expand Down
134 changes: 44 additions & 90 deletions vnode/pod_provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pod_provider

import (
"context"
"k8s.io/apimachinery/pkg/api/errors"
"sort"
"strings"
"time"
Expand All @@ -28,8 +27,8 @@ import (
"github.com/koupleless/virtual-kubelet/tunnel"
"github.com/koupleless/virtual-kubelet/virtual_kubelet"
"github.com/koupleless/virtual-kubelet/virtual_kubelet/nodeutil"
pkgerrors "github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/koupleless/virtual-kubelet/common/log"
Expand Down Expand Up @@ -159,19 +158,8 @@ func (b *VPodProvider) SyncOneContainerInfo(ctx context.Context, bizStatusData m
}
}

// startContainer is a method of VPodProvider that starts a container
func (b *VPodProvider) startContainer(ctx context.Context, podKey string, container *corev1.Container) error {
// clear local container status cache
return b.tunnel.StartContainer(ctx, b.nodeID, podKey, container)
}

// stopContainer is a method of VPodProvider that stops a container
func (b *VPodProvider) stopContainer(ctx context.Context, podKey string, container *corev1.Container) error {
return b.tunnel.ShutdownContainer(ctx, b.nodeID, podKey, container)
}

// handleContainerStart is a method of VPodProvider that handles the start of a container
func (b *VPodProvider) handleContainerStart(ctx context.Context, pod *corev1.Pod, containers []corev1.Container) {
// handleBizBatchStart is a method of VPodProvider that handles the start of a container
func (b *VPodProvider) handleBizBatchStart(ctx context.Context, pod *corev1.Pod, containers []corev1.Container) {
podKey := utils.GetPodKey(pod)

logger := log.G(ctx).WithField("podKey", podKey)
Expand All @@ -185,7 +173,7 @@ func (b *VPodProvider) handleContainerStart(ctx context.Context, pod *corev1.Pod
for _, container := range containers {
err := tracker.G().FuncTrack(labelMap[model.LabelKeyOfTraceID], model.TrackSceneVPodDeploy, model.TrackEventContainerStart, labelMap, func() (error, model.ErrorCode) {
err := utils.CallWithRetry(ctx, func(_ int) (bool, error) {
innerErr := b.startContainer(ctx, podKey, &container)
innerErr := b.tunnel.StartBiz(ctx, b.nodeID, podKey, &container)

return innerErr != nil, innerErr
}, nil)
Expand All @@ -200,8 +188,8 @@ func (b *VPodProvider) handleContainerStart(ctx context.Context, pod *corev1.Pod
}
}

// handleContainerShutdown is a method of VPodProvider that handles the shutdown of a container
func (b *VPodProvider) handleContainerShutdown(ctx context.Context, pod *corev1.Pod, containers []corev1.Container) {
// handleBizBatchStop is a method of VPodProvider that handles the shutdown of a container
func (b *VPodProvider) handleBizBatchStop(ctx context.Context, pod *corev1.Pod, containers []corev1.Container) {
podKey := utils.GetPodKey(pod)

logger := log.G(ctx).WithField("podKey", podKey)
Expand All @@ -215,7 +203,7 @@ func (b *VPodProvider) handleContainerShutdown(ctx context.Context, pod *corev1.
for _, container := range containers {
err := tracker.G().FuncTrack(labelMap[model.LabelKeyOfTraceID], model.TrackSceneVPodDeploy, model.TrackEventContainerShutdown, labelMap, func() (error, model.ErrorCode) {
err := utils.CallWithRetry(ctx, func(_ int) (bool, error) {
innerErr := b.stopContainer(ctx, podKey, &container)
innerErr := b.tunnel.StopBiz(ctx, b.nodeID, podKey, &container)

return innerErr != nil, innerErr
}, nil)
Expand All @@ -238,8 +226,8 @@ func (b *VPodProvider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
// update the baseline info so the async handle logic can see them first
podCopy := pod.DeepCopy()
b.runtimeInfoStore.PutPod(podCopy, b.tunnel)
b.handleContainerStart(ctx, podCopy, podCopy.Spec.Containers)

b.handleBizBatchStart(ctx, podCopy, podCopy.Spec.Containers)
b.notify(podCopy)
return nil
}

Expand All @@ -258,38 +246,46 @@ func (b *VPodProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
}

oldPod := b.runtimeInfoStore.GetPodByKey(podKey).DeepCopy()
if oldPod == nil {
return pkgerrors.Errorf("pod %s not found when updating", podKey)
}

newContainerMap := make(map[string]corev1.Container)
oldContainerMap := make(map[string]corev1.Container)
for _, container := range newPod.Spec.Containers {
newContainerMap[container.Name] = container
}
shouldUpdateContainers := make([]corev1.Container, 0)
if oldPod != nil {
// stop first
for _, oldContainer := range oldPod.Spec.Containers {
// check need to stop
if newContainer, ok := newContainerMap[oldContainer.Name]; ok && !cmp.Equal(newContainer, oldContainer) {
// sending to stop
shouldUpdateContainers = append(shouldUpdateContainers, oldContainer)
} else {
// delete from new containers
delete(newContainerMap, oldContainer.Name)
}
}
b.handleContainerShutdown(ctx, oldPod, shouldUpdateContainers)
for _, container := range oldPod.Spec.Containers {
oldContainerMap[container.Name] = container
}

if len(shouldUpdateContainers) == 0 {
// no need to stop, just do nothing
return nil
shouldStopContainers := make([]corev1.Container, 0)
shouldStartContainers := make([]corev1.Container, 0)
// find the container that updated in new pod
for name, oldContainer := range oldContainerMap {
if newContainer, has := newContainerMap[name]; has && !cmp.Equal(newContainer, oldContainer) {
shouldStopContainers = append(shouldStopContainers, oldContainer)
shouldStartContainers = append(shouldStartContainers, newContainer)
}
}
// find the new container that not existed in old pod
for name, newContainer := range newContainerMap {
if _, has := oldContainerMap[name]; !has {
shouldStartContainers = append(shouldStartContainers, newContainer)
}
}
if len(shouldStopContainers) > 0 {
b.handleBizBatchStop(ctx, oldPod, shouldStopContainers)
}

b.runtimeInfoStore.PutPod(newPod.DeepCopy(), b.tunnel)

// only start new containers and changed containers
startNewContainer := func() {
b.handleContainerStart(ctx, newPod, shouldUpdateContainers)
if len(shouldStartContainers) == 0 {
b.notify(newPod)
return nil
}

// only start new containers and changed containers
tracker.G().Eventually(pod.Labels[model.LabelKeyOfTraceID], model.TrackSceneVPodDeploy, model.TrackEventVPodUpdate, pod.Labels, model.CodeContainerStartTimeout, func() bool {
podFromKubernetes := &corev1.Pod{}
err := b.client.Get(ctx, client.ObjectKey{
Expand All @@ -306,16 +302,19 @@ func (b *VPodProvider) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
nameToContainerStatus[containerStatus.Name] = containerStatus
}

for _, shouldUpdateContainer := range shouldUpdateContainers {
for _, shouldUpdateContainer := range shouldStopContainers {
if status, has := nameToContainerStatus[shouldUpdateContainer.Name]; has && status.State.Terminated == nil {
return false
}
}
return true
}, time.Minute, time.Second, startNewContainer, func() {
}, time.Minute, time.Second, func() {
b.handleBizBatchStart(ctx, newPod, shouldStopContainers)
}, func() {
logger.Error("stop old containers timeout, not start new containers")
})

b.notify(newPod)
return nil
}

Expand All @@ -337,53 +336,8 @@ func (b *VPodProvider) DeletePod(ctx context.Context, pod *corev1.Pod) error {

// delete from curr provider
b.runtimeInfoStore.DeletePod(podKey, b.tunnel)

b.handleContainerShutdown(ctx, pod, pod.Spec.Containers)

if pod.DeletionGracePeriodSeconds == nil || *pod.DeletionGracePeriodSeconds == 0 {
// force delete, just return, skip check and delete
logger.Warnf("Pod force delete")
return nil
}

// check all containers shutdown successfully
deletePod := func() {
if b.client != nil {
// delete pod with no grace period, mock kubelet
b.client.Delete(ctx, pod, &client.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
})
}
}

tracker.G().Eventually(pod.Labels[model.LabelKeyOfTraceID], model.TrackSceneVPodDeploy, model.TrackEventVPodDelete, pod.Labels, model.CodeContainerStartTimeout, func() bool {
podFromKubernetes := &corev1.Pod{}
err := b.client.Get(ctx, client.ObjectKey{
Namespace: pod.Namespace,
Name: pod.Name,
}, podFromKubernetes)
if err != nil {
if errors.IsNotFound(err) {
return true
}
logger.WithError(err).Error("Failed to get pod from k8s")
return false
}

nameToContainerStatus := make(map[string]corev1.ContainerStatus)
for _, containerStatus := range podFromKubernetes.Status.ContainerStatuses {
nameToContainerStatus[containerStatus.Name] = containerStatus
}
for _, container := range podFromKubernetes.Spec.Containers {
if status, has := nameToContainerStatus[container.Name]; has && status.State.Terminated == nil {
return false
}
}
return true
}, time.Second*25, time.Second, deletePod, func() {
logger.Error("failed to delete pod: timeout")
})

b.handleBizBatchStop(ctx, pod, pod.Spec.Containers)
b.notify(pod)
return nil
}

Expand Down

0 comments on commit 0bb5e66

Please sign in to comment.