Skip to content
This repository has been archived by the owner on Jan 19, 2024. It is now read-only.

feat: Include logs of all containers in error message #214

Merged
merged 5 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions pkg/eventhandler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"strconv"
"strings"
"time"

"github.com/keptn/go-utils/pkg/lib/keptn"
Expand Down Expand Up @@ -147,7 +148,7 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa
if err != nil {
log.Printf("Error while connecting to cluster: %s\n", err.Error())
if !action.Silent {
sendTaskFailedEvent(eh.Keptn, "", eh.ServiceName, err, "")
sendJobFailedEvent(eh.Keptn, "", eh.ServiceName, err)
}
return
}
Expand All @@ -165,7 +166,7 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa

log.Printf(errorText)
if !action.Silent {
sendTaskFailedEvent(eh.Keptn, "", eh.ServiceName, errors.New(errorText), "")
sendTaskFailedEvent(eh.Keptn, task.Name, eh.ServiceName, errors.New(errorText), "")
}

return
Expand All @@ -192,7 +193,7 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa
if err != nil {
log.Printf("Error while creating job: %s\n", err)
if !action.Silent {
sendTaskFailedEvent(eh.Keptn, jobName, eh.ServiceName, err, "")
sendTaskFailedEvent(eh.Keptn, task.Name, eh.ServiceName, err, "")
}
return
}
Expand All @@ -211,14 +212,14 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa
if jobErr != nil {
log.Printf("Error while creating job: %s\n", jobErr.Error())
if !action.Silent {
sendTaskFailedEvent(eh.Keptn, jobName, eh.ServiceName, jobErr, logs)
sendTaskFailedEvent(eh.Keptn, task.Name, eh.ServiceName, jobErr, logs)
}
return
}

allJobLogs = append(
allJobLogs, jobLogs{
name: jobName,
name: task.Name,
logs: logs,
},
)
Expand All @@ -233,13 +234,13 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa
}
}

func sendTaskFailedEvent(myKeptn *keptnv2.Keptn, jobName string, serviceName string, err error, logs string) {
func sendTaskFailedEvent(myKeptn *keptnv2.Keptn, taskName string, serviceName string, err error, logs string) {
var message string

if logs != "" {
message = fmt.Sprintf("Job %s failed: %s\n\nLogs: \n%s", jobName, err, logs)
message = fmt.Sprintf("Task '%s' failed: %s\n\nLogs: \n%s", taskName, err.Error(), logs)
} else {
message = fmt.Sprintf("Job %s failed: %s", jobName, err)
message = fmt.Sprintf("Task '%s' failed: %s", taskName, err.Error())
}

_, err = myKeptn.SendTaskFinishedEvent(
Expand All @@ -255,18 +256,33 @@ func sendTaskFailedEvent(myKeptn *keptnv2.Keptn, jobName string, serviceName str
}
}

func sendJobFailedEvent(myKeptn *keptnv2.Keptn, jobName string, serviceName string, err error) {
_, err = myKeptn.SendTaskFinishedEvent(
&keptnv2.EventData{
Status: keptnv2.StatusErrored,
Result: keptnv2.ResultFailed,
Message: fmt.Sprintf("Job %s failed: %s", jobName, err.Error()),
}, serviceName,
)

if err != nil {
log.Printf("Error while sending started event: %s\n", err)
}
}

func sendTaskFinishedEvent(myKeptn *keptnv2.Keptn, serviceName string, jobLogs []jobLogs, data dataForFinishedEvent) {
var message string
var logMessage strings.Builder

for _, jobLogs := range jobLogs {
message += fmt.Sprintf("Job %s finished successfully!\n\nLogs:\n%s\n\n", jobLogs.name, jobLogs.logs)
logMessage.WriteString(
fmt.Sprintf("Task '%s' finished successfully!\n\nLogs:\n%s\n\n", jobLogs.name, jobLogs.logs),
)
}

eventData := &keptnv2.EventData{

Status: keptnv2.StatusSucceeded,
Result: keptnv2.ResultPass,
Message: message,
Message: logMessage.String(),
}

var err error
Expand Down
14 changes: 14 additions & 0 deletions pkg/eventhandler/fake/eventhandlers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

176 changes: 160 additions & 16 deletions pkg/k8sutils/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ package k8sutils
import (
"bytes"
"context"
"fmt"
"io"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
)

// GetLogsOfPod returns the k8s logs of a job in a namespace
func (k8s *K8sImpl) GetLogsOfPod(jobName string, namespace string) (string, error) {

// TODO include the logs of the initcontainer

podLogOpts := v1.PodLogOptions{}

list, err := k8s.clientset.CoreV1().Pods(namespace).List(
context.TODO(), metav1.ListOptions{
LabelSelector: "job-name=" + jobName,
Expand All @@ -25,24 +23,170 @@ func (k8s *K8sImpl) GetLogsOfPod(jobName string, namespace string) (string, erro
return "", err
}

logs := ""
var logs strings.Builder

for _, pod := range list.Items {

req := k8s.clientset.CoreV1().Pods(namespace).GetLogs(pod.Name, &podLogOpts)
podLogs, err := req.Stream(context.TODO())
if err != nil {
return "", err
printInitContainerLogs := false
containerStatus := getTerminatedContainersWithStatusOfPod(pod)

// Go through all containers and check if the termination reason is Completed, if not we found a container
// that exited with an error and therefore have to include all logs from init container, as files could not
// have been copied over
for _, container := range containerStatus {
if container.status.Reason != "Completed" {
printInitContainerLogs = true
break
}
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "", err
// Query all logs from containers that have terminated and therefore already had the chance to
// produce logs, otherwise the k8s api will return an error
for _, container := range containerStatus {

// If we don't want to print the init container logs, we just skip this iteration of the
// loop
if container.containerType == initContainerType && !printInitContainerLogs {
continue
}

// Query logs of the current selected container
logsOfContainer, err := getLogsOfContainer(k8s, pod, namespace, container.name)
if err != nil {
// In case we can't query the logs of a container, we append the reason instead of the container logs
logsOfContainer = fmt.Sprintf("Unable to query logs of container: %s", err.Error())
}

// If the container did not put out any logs, we skip it entirely to prevent polluting the
// log output too much by appending a lot of empty lines for each container
if logsOfContainer != "" {
logs.WriteString(buildLogOutputForContainer(pod, container, logsOfContainer))
logs.WriteString("\n")
}
}
logs += buf.String()
}

return logs, nil
return logs.String(), nil
}

const (
// Indicates that the container is an Init container
initContainerType = iota
// Indicates that the container is a container defined in the job workload
jobContainerType
)

type containerStatus struct {
name string
containerType int
status *v1.ContainerStateTerminated
}

// getLogsOfContainer returns the logs of a specific container inside the given pod
func getLogsOfContainer(k8s *K8sImpl, pod v1.Pod, namespace string, container string) (string, error) {

// Request logs of a specific container
req := k8s.clientset.CoreV1().Pods(namespace).GetLogs(pod.Name, &v1.PodLogOptions{
Container: container,
})

// Stream logs into a buffer
podLogs, err := req.Stream(context.TODO())
if err != nil {
return "", err
}

defer podLogs.Close()

// Convert the buffer into a string
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "", err
}

return buf.String(), nil
}

// getTerminatedContainersWithStatusOfPod collects the terminated states of all containers inside a given Pod
func getTerminatedContainersWithStatusOfPod(pod v1.Pod) []containerStatus {
Raffy23 marked this conversation as resolved.
Show resolved Hide resolved
var containerStatusList []containerStatus

// Loop over all initContainers in the Pod spec and look at the appropriate
// InitContainerStatus index to determine the status of the init container
for index, initContainer := range pod.Spec.InitContainers {
if pod.Status.InitContainerStatuses[index].State.Terminated != nil {
christian-kreuzberger-dtx marked this conversation as resolved.
Show resolved Hide resolved
containerStatusList = append(containerStatusList, containerStatus{
name: initContainer.Name,
containerType: initContainerType,
status: pod.Status.InitContainerStatuses[index].State.Terminated,
})
}
}

// Loop over all regular containers in the Pod spec and look at the appropriate
// ContainerStatus index to determine the status of the container
for index, container := range pod.Spec.Containers {
if pod.Status.ContainerStatuses[index].State.Terminated != nil {
containerStatusList = append(containerStatusList, containerStatus{
name: container.Name,
containerType: jobContainerType,
status: pod.Status.ContainerStatuses[index].State.Terminated,
})
}
}

return containerStatusList
}

// buildLogOutputForContainer generates a pretty output of the given logs and the container status in the following
// format. Depending on the status the output changes slightly (output will be empty of no logs are produced):
//
// - Normal output:
// Container <container.name> of pod <pod.name>:
// <logsOfContainer>
//
// - In case of an error:
// Container <container.name> of pod <pod.name> terminated with an error (Reason: <reason> [, Message: <message> |, ExitCode: <code>]):
// <logsOfContainer>
//
func buildLogOutputForContainer(pod v1.Pod, container containerStatus, logsOfContainer string) string {
var logs strings.Builder

// Prepend the container name at the beginning, so we are able to separate logs of different containers
// and display a termination error at the beginning, may be more interesting than the logs of the container
if container.status.Reason != "Completed" {
logs.WriteString("Container ")
logs.WriteString(container.name)
logs.WriteString(" of pod ")
logs.WriteString(pod.Name)
logs.WriteString(" terminated with an error (Reason: ")
logs.WriteString(container.status.Reason)

// Sometimes the message is not given, to provide prettier logs we just don't print the
// message part if it doesn't exist
if container.status.Message != "" {
logs.WriteString(", Message: ")
logs.WriteString(container.status.Message)
logs.WriteString(")")
} else {
logs.WriteString(", ExitCode: ")
logs.WriteString(fmt.Sprintf("%d", container.status.ExitCode))
logs.WriteString(")")
}

logs.WriteString(":\n")
} else {
logs.WriteString("Container ")
logs.WriteString(container.name)
logs.WriteString(" of pod ")
logs.WriteString(pod.Name)
logs.WriteString(":\n")
}

// Finally, append the actual logs of the container or a default message to the log
logs.WriteString(logsOfContainer)
logs.WriteString("\n")

return logs.String()
}
27 changes: 23 additions & 4 deletions pkg/k8sutils/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ func TestGetLogsOfPodHappyPath(t *testing.T) {

logsOfPod, err := k8s.GetLogsOfPod(jobName, namespace)
assert.NoError(t, err)
assert.Equal(t, logsOfPod, "fake logs")
assert.Contains(t, logsOfPod, "fake logs")

// Assert that the fake received the call
getLogAction := k8stesting.GenericActionImpl{
getLogActionInitContainer := k8stesting.GenericActionImpl{
ActionImpl: k8stesting.ActionImpl{
Namespace: namespace,
Verb: "get",
Expand All @@ -138,8 +138,27 @@ func TestGetLogsOfPodHappyPath(t *testing.T) {
},
Subresource: "log",
},
Value: &v1.PodLogOptions{},
Value: &v1.PodLogOptions{
Container: initContainerName,
},
}

getLogActionContainer := k8stesting.GenericActionImpl{
ActionImpl: k8stesting.ActionImpl{
Namespace: namespace,
Verb: "get",
Resource: schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
},
Subresource: "log",
},
Value: &v1.PodLogOptions{
Container: jobName,
},
}

assert.Contains(t, k8sClientSet.Actions(), getLogAction)
assert.Contains(t, k8sClientSet.Actions(), getLogActionInitContainer)
assert.Contains(t, k8sClientSet.Actions(), getLogActionContainer)
}