Skip to content

Commit

Permalink
[Wf-Diagnostics] add retry policy validation to diagnostics (#6529)
Browse files Browse the repository at this point in the history
  • Loading branch information
sankari165 authored Nov 29, 2024
1 parent fcd56c6 commit eacb093
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 20 deletions.
63 changes: 50 additions & 13 deletions service/worker/diagnostics/invariant/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,44 @@ func NewInvariant(p Params) Retry {
func (r *retry) Check(context.Context) ([]invariant.InvariantCheckResult, error) {
result := make([]invariant.InvariantCheckResult, 0)
events := r.workflowExecutionHistory.GetHistory().GetEvents()

lastEvent := fetchContinuedAsNewEvent(events)
startedEvent := fetchStartedEvent(events)
if lastEvent != nil && startedEvent != nil {
if startedEvent.RetryPolicy != nil {
result = append(result, invariant.InvariantCheckResult{
InvariantType: WorkflowRetry.String(),
Reason: failure.ErrorTypeFromReason(*lastEvent.FailureReason).String(),
Metadata: invariant.MarshalData(RetryMetadata{
RetryPolicy: startedEvent.RetryPolicy,
Attempt: startedEvent.Attempt,
}),
})
startedEvent := fetchWfStartedEvent(events)
if lastEvent != nil && startedEvent != nil && startedEvent.RetryPolicy != nil {
result = append(result, invariant.InvariantCheckResult{
InvariantType: WorkflowRetryInfo.String(),
Reason: failure.ErrorTypeFromReason(*lastEvent.FailureReason).String(),
Metadata: invariant.MarshalData(RetryMetadata{
RetryPolicy: startedEvent.RetryPolicy,
}),
})
}

if issue := checkRetryPolicy(startedEvent.RetryPolicy); issue != "" {
result = append(result, invariant.InvariantCheckResult{
InvariantType: WorkflowRetryIssue.String(),
Reason: issue.String(),
Metadata: invariant.MarshalData(RetryMetadata{
RetryPolicy: startedEvent.RetryPolicy,
}),
})
}

for _, event := range events {
if event.GetActivityTaskScheduledEventAttributes() != nil {
attr := event.GetActivityTaskScheduledEventAttributes()
if issue := checkRetryPolicy(attr.RetryPolicy); issue != "" {
result = append(result, invariant.InvariantCheckResult{
InvariantType: ActivityRetryIssue.String(),
Reason: issue.String(),
Metadata: invariant.MarshalData(RetryMetadata{
RetryPolicy: attr.RetryPolicy,
}),
})
}
}
}

return result, nil
}

Expand All @@ -76,7 +100,7 @@ func fetchContinuedAsNewEvent(events []*types.HistoryEvent) *types.WorkflowExecu
return nil
}

func fetchStartedEvent(events []*types.HistoryEvent) *types.WorkflowExecutionStartedEventAttributes {
func fetchWfStartedEvent(events []*types.HistoryEvent) *types.WorkflowExecutionStartedEventAttributes {
for _, event := range events {
if event.GetWorkflowExecutionStartedEventAttributes() != nil {
return event.GetWorkflowExecutionStartedEventAttributes()
Expand All @@ -85,9 +109,22 @@ func fetchStartedEvent(events []*types.HistoryEvent) *types.WorkflowExecutionSta
return nil
}

func checkRetryPolicy(policy *types.RetryPolicy) IssueType {
if policy == nil {
return ""
}
if policy.GetExpirationIntervalInSeconds() == 0 && policy.GetMaximumAttempts() == 1 {
return RetryPolicyValidationMaxAttempts
}
if policy.GetMaximumAttempts() == 0 && policy.GetExpirationIntervalInSeconds() < policy.GetInitialIntervalInSeconds() {
return RetryPolicyValidationExpInterval
}
return ""
}

func (r *retry) RootCause(ctx context.Context, issues []invariant.InvariantCheckResult) ([]invariant.InvariantRootCauseResult, error) {
// Not implemented since this invariant does not have any root cause.
// Issue identified in Check() is a workflow retry which is essentially handled by failure invariant
// Issue identified in Check() are the root cause.
result := make([]invariant.InvariantRootCauseResult, 0)
return result, nil
}
75 changes: 70 additions & 5 deletions service/worker/diagnostics/invariant/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,29 @@ import (
)

func Test__Check(t *testing.T) {
metadata := RetryMetadata{
retriedWfMetadata := RetryMetadata{
RetryPolicy: &types.RetryPolicy{
InitialIntervalInSeconds: 1,
MaximumAttempts: 2,
},
Attempt: 0,
}
metadataInBytes, err := json.Marshal(metadata)
retriedWfMetadataInBytes, err := json.Marshal(retriedWfMetadata)
require.NoError(t, err)
invalidAttemptsMetadata := RetryMetadata{
RetryPolicy: &types.RetryPolicy{
InitialIntervalInSeconds: 1,
MaximumAttempts: 1,
},
}
invalidAttemptsMetadataInBytes, err := json.Marshal(invalidAttemptsMetadata)
require.NoError(t, err)
invalidExpIntervalMetadata := RetryMetadata{
RetryPolicy: &types.RetryPolicy{
InitialIntervalInSeconds: 10,
ExpirationIntervalInSeconds: 5,
},
}
invalidExpIntervalMetadataInBytes, err := json.Marshal(invalidExpIntervalMetadata)
require.NoError(t, err)
testCases := []struct {
name string
Expand All @@ -55,9 +70,26 @@ func Test__Check(t *testing.T) {
testData: retriedWfHistory(),
expectedResult: []invariant.InvariantCheckResult{
{
InvariantType: WorkflowRetry.String(),
InvariantType: WorkflowRetryInfo.String(),
Reason: "The failure is caused by a timeout during the execution",
Metadata: metadataInBytes,
Metadata: retriedWfMetadataInBytes,
},
},
err: nil,
},
{
name: "invalid retry policy",
testData: invalidRetryPolicyWfHistory(),
expectedResult: []invariant.InvariantCheckResult{
{
InvariantType: ActivityRetryIssue.String(),
Reason: RetryPolicyValidationMaxAttempts.String(),
Metadata: invalidAttemptsMetadataInBytes,
},
{
InvariantType: WorkflowRetryIssue.String(),
Reason: RetryPolicyValidationExpInterval.String(),
Metadata: invalidExpIntervalMetadataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -98,3 +130,36 @@ func retriedWfHistory() *types.GetWorkflowExecutionHistoryResponse {
},
}
}

func invalidRetryPolicyWfHistory() *types.GetWorkflowExecutionHistoryResponse {
return &types.GetWorkflowExecutionHistoryResponse{
History: &types.History{
Events: []*types.HistoryEvent{
{
ID: 1,
WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{
RetryPolicy: &types.RetryPolicy{
InitialIntervalInSeconds: 10,
ExpirationIntervalInSeconds: 5,
},
},
},
{
ID: 5,
ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{
RetryPolicy: &types.RetryPolicy{
InitialIntervalInSeconds: 1,
MaximumAttempts: 1,
},
},
},
{
ID: 6,
ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{
RetryPolicy: nil,
},
},
},
},
}
}
16 changes: 14 additions & 2 deletions service/worker/diagnostics/invariant/retry/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,26 @@ import "github.com/uber/cadence/common/types"
type RetryType string

const (
WorkflowRetry RetryType = "Workflow Retry"
WorkflowRetryInfo RetryType = "Workflow Retry configured and applied"
WorkflowRetryIssue RetryType = "Workflow Retry configured but invalid"
ActivityRetryIssue RetryType = "Activity Retry configured but invalid"
)

func (r RetryType) String() string {
return string(r)
}

type IssueType string

const (
RetryPolicyValidationMaxAttempts IssueType = "MaximumAttempts set to 1 will not retry since maximum attempts includes the first attempt."
RetryPolicyValidationExpInterval IssueType = "ExpirationIntervalInSeconds less than InitialIntervalInSeconds will not retry."
)

func (i IssueType) String() string {
return string(i)
}

type RetryMetadata struct {
RetryPolicy *types.RetryPolicy
Attempt int32
}

0 comments on commit eacb093

Please sign in to comment.