Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/picking changes to allow for failover polling #6523

100 changes: 80 additions & 20 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

80 changes: 76 additions & 4 deletions .gen/go/matching/matching.go

Large diffs are not rendered by default.

418 changes: 412 additions & 6 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

965 changes: 513 additions & 452 deletions .gen/proto/history/v1/service.pb.go

Large diffs are not rendered by default.

877 changes: 442 additions & 435 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

256 changes: 131 additions & 125 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/valyala/fastjson v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c h1:sagx8l5XOlJWlwwflrxsxlYXgsgyr1Jpe2eXl7q5Vic=
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f h1:U2nI6IKh80rrueDb2G3wuhCkCHYCsLp9EFBazeTs7Dk=
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
27 changes: 27 additions & 0 deletions common/persistence/data_manager_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,3 +547,30 @@ func TestTaskListPartitionConfigToInternalType(t *testing.T) {
})
}
}

func TestVersionHistoryCopy(t *testing.T) {
a := VersionHistories{
CurrentVersionHistoryIndex: 1,
Histories: []*VersionHistory{
{
BranchToken: []byte("token"),
Items: []*VersionHistoryItem{
{
EventID: 1,
Version: 2,
},
},
},
{
BranchToken: []byte("token"),
Items: []*VersionHistoryItem{
{
EventID: 1,
Version: 2,
},
},
},
},
}
assert.Equal(t, &a, a.Duplicate())
}
11 changes: 6 additions & 5 deletions common/persistence/versionHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ func (item *VersionHistoryItem) Duplicate() *VersionHistoryItem {

// ToInternalType return internal format of version history item
func (item *VersionHistoryItem) ToInternalType() *types.VersionHistoryItem {

if item == nil {
return nil
}
return &types.VersionHistoryItem{
EventID: item.EventID,
Version: item.Version,
Expand Down Expand Up @@ -229,7 +231,6 @@ func (v *VersionHistory) AddOrUpdateItem(
func (v *VersionHistory) ContainsItem(
item *VersionHistoryItem,
) bool {

prevEventID := common.FirstEventID - 1
for _, currentItem := range v.Items {
if item.Version == currentItem.Version {
Expand Down Expand Up @@ -301,11 +302,9 @@ func (v *VersionHistory) GetFirstItem() (*VersionHistoryItem, error) {

// GetLastItem return the last version history item
func (v *VersionHistory) GetLastItem() (*VersionHistoryItem, error) {

if len(v.Items) == 0 {
return nil, &types.BadRequestError{Message: "version history is empty"}
}

return v.Items[len(v.Items)-1].Duplicate(), nil
}

Expand Down Expand Up @@ -406,6 +405,9 @@ func NewVersionHistoriesFromInternalType(

// Duplicate duplicate VersionHistories
func (h *VersionHistories) Duplicate() *VersionHistories {
if h == nil {
return nil
}

currentVersionHistoryIndex := h.CurrentVersionHistoryIndex
histories := []*VersionHistory{}
Expand Down Expand Up @@ -590,6 +592,5 @@ func (h *VersionHistories) GetCurrentVersionHistoryIndex() int {

// GetCurrentVersionHistory get the current version history
func (h *VersionHistories) GetCurrentVersionHistory() (*VersionHistory, error) {

return h.GetVersionHistory(h.GetCurrentVersionHistoryIndex())
}
31 changes: 20 additions & 11 deletions common/types/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ func (v *FailoverMarkerToken) GetFailoverMarker() (o *FailoverMarkerAttributes)

// GetMutableStateRequest is an internal type (TBD...)
type GetMutableStateRequest struct {
DomainUUID string `json:"domainUUID,omitempty"`
Execution *WorkflowExecution `json:"execution,omitempty"`
ExpectedNextEventID int64 `json:"expectedNextEventId,omitempty"`
CurrentBranchToken []byte `json:"currentBranchToken,omitempty"`
DomainUUID string `json:"domainUUID,omitempty"`
Execution *WorkflowExecution `json:"execution,omitempty"`
ExpectedNextEventID int64 `json:"expectedNextEventId,omitempty"`
CurrentBranchToken []byte `json:"currentBranchToken,omitempty"`
VersionHistoryItem *VersionHistoryItem `json:"versionHistoryItem,omitempty"`
}

// GetDomainUUID is an internal getter (TBD...)
Expand Down Expand Up @@ -272,16 +273,24 @@ func (v *ParentExecutionInfo) GetExecution() (o *WorkflowExecution) {

// PollMutableStateRequest is an internal type (TBD...)
type PollMutableStateRequest struct {
DomainUUID string `json:"domainUUID,omitempty"`
Execution *WorkflowExecution `json:"execution,omitempty"`
ExpectedNextEventID int64 `json:"expectedNextEventId,omitempty"`
CurrentBranchToken []byte `json:"currentBranchToken,omitempty"`
DomainUUID string `json:"domainUUID,omitempty"`
Execution *WorkflowExecution `json:"execution,omitempty"`
ExpectedNextEventID int64 `json:"expectedNextEventId,omitempty"`
CurrentBranchToken []byte `json:"currentBranchToken,omitempty"`
VersionHistoryItem *VersionHistoryItem `json:"versionHistoryItem,omitempty"`
}

func (p *PollMutableStateRequest) GetVersionHistoryItem() *VersionHistoryItem {
if p.VersionHistoryItem == nil {
return nil
}
return p.VersionHistoryItem
}

// GetDomainUUID is an internal getter (TBD...)
func (v *PollMutableStateRequest) GetDomainUUID() (o string) {
if v != nil {
return v.DomainUUID
func (p *PollMutableStateRequest) GetDomainUUID() (o string) {
if p != nil {
return p.DomainUUID
}
return
}
Expand Down
19 changes: 19 additions & 0 deletions common/types/mapper/proto/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package proto

import (
adminv1 "github.com/uber/cadence-idl/go/proto/admin/v1"
apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"

historyv1 "github.com/uber/cadence/.gen/proto/history/v1"
Expand Down Expand Up @@ -292,23 +293,41 @@ func FromHistoryGetMutableStateRequest(t *types.GetMutableStateRequest) *history
if t == nil {
return nil
}

var versionHistoryItem *adminv1.VersionHistoryItem
if t.VersionHistoryItem != nil {
davidporter-id-au marked this conversation as resolved.
Show resolved Hide resolved
versionHistoryItem = &adminv1.VersionHistoryItem{
EventId: t.VersionHistoryItem.EventID,
Version: t.VersionHistoryItem.Version,
}
}

return &historyv1.GetMutableStateRequest{
DomainId: t.DomainUUID,
WorkflowExecution: FromWorkflowExecution(t.Execution),
ExpectedNextEventId: t.ExpectedNextEventID,
CurrentBranchToken: t.CurrentBranchToken,
VersionHistoryItem: versionHistoryItem,
}
}

func ToHistoryGetMutableStateRequest(t *historyv1.GetMutableStateRequest) *types.GetMutableStateRequest {
if t == nil {
return nil
}
var versionHistoryItem *types.VersionHistoryItem
if t.VersionHistoryItem != nil {
versionHistoryItem = &types.VersionHistoryItem{
EventID: t.VersionHistoryItem.EventId,
Version: t.VersionHistoryItem.Version,
}
}
return &types.GetMutableStateRequest{
DomainUUID: t.DomainId,
Execution: ToWorkflowExecution(t.WorkflowExecution),
ExpectedNextEventID: t.ExpectedNextEventId,
CurrentBranchToken: t.CurrentBranchToken,
VersionHistoryItem: versionHistoryItem,
}
}

Expand Down
24 changes: 24 additions & 0 deletions common/types/mapper/thrift/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package thrift

import (
"github.com/uber/cadence/.gen/go/history"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -227,11 +228,21 @@ func FromHistoryGetMutableStateRequest(t *types.GetMutableStateRequest) *history
if t == nil {
return nil
}

var versionHistoryItem *shared.VersionHistoryItem
if t.VersionHistoryItem != nil {
versionHistoryItem = &shared.VersionHistoryItem{
EventID: &t.VersionHistoryItem.EventID,
Version: &t.VersionHistoryItem.Version,
}
}

return &history.GetMutableStateRequest{
DomainUUID: &t.DomainUUID,
Execution: FromWorkflowExecution(t.Execution),
ExpectedNextEventId: &t.ExpectedNextEventID,
CurrentBranchToken: t.CurrentBranchToken,
VersionHistoryItem: versionHistoryItem,
}
}

Expand All @@ -240,11 +251,24 @@ func ToHistoryGetMutableStateRequest(t *history.GetMutableStateRequest) *types.G
if t == nil {
return nil
}

var versionHistoryItem *types.VersionHistoryItem
if t.VersionHistoryItem != nil {
versionHistoryItem = &types.VersionHistoryItem{}
if t.VersionHistoryItem.EventID != nil {
versionHistoryItem.EventID = *t.VersionHistoryItem.EventID
}
if t.VersionHistoryItem.Version != nil {
versionHistoryItem.Version = *t.VersionHistoryItem.Version
}
}

return &types.GetMutableStateRequest{
DomainUUID: t.GetDomainUUID(),
Execution: ToWorkflowExecution(t.Execution),
ExpectedNextEventID: t.GetExpectedNextEventId(),
CurrentBranchToken: t.CurrentBranchToken,
VersionHistoryItem: versionHistoryItem,
}
}

Expand Down
1 change: 1 addition & 0 deletions common/types/testdata/service_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
Execution: &WorkflowExecution,
ExpectedNextEventID: EventID1,
CurrentBranchToken: BranchToken,
VersionHistoryItem: &VersionHistoryItem,
}
HistoryGetMutableStateResponse = types.GetMutableStateResponse{
Execution: &WorkflowExecution,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f
github.com/uber/ringpop-go v0.8.5
github.com/uber/tchannel-go v1.22.2
github.com/urfave/cli/v2 v2.27.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c h1:sagx8l5XOlJWlwwflrxsxlYXgsgyr1Jpe2eXl7q5Vic=
github.com/uber/cadence-idl v0.0.0-20241118185545-0ff09166fc7c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f h1:U2nI6IKh80rrueDb2G3wuhCkCHYCsLp9EFBazeTs7Dk=
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from 0ff091 to 57bd68
2 changes: 2 additions & 0 deletions proto/internal/uber/cadence/history/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ message GetMutableStateRequest {
api.v1.WorkflowExecution workflow_execution = 2;
int64 expected_next_event_id = 3;
bytes current_branch_token = 4;
admin.v1.VersionHistoryItem version_history_item = 5;
}

message GetMutableStateResponse {
Expand All @@ -360,6 +361,7 @@ message GetMutableStateResponse {
shared.v1.VersionHistories version_histories = 16;
bool is_sticky_task_list_enabled = 17;
int64 history_size = 18;

}

message PollMutableStateRequest {
Expand Down
47 changes: 33 additions & 14 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ type (
}

getHistoryContinuationToken struct {
RunID string
FirstEventID int64
NextEventID int64
IsWorkflowRunning bool
PersistenceToken []byte
TransientDecision *types.TransientDecisionInfo
BranchToken []byte
RunID string
FirstEventID int64
NextEventID int64
IsWorkflowRunning bool
PersistenceToken []byte
TransientDecision *types.TransientDecisionInfo
BranchToken []byte
VersionHistoryItem *types.VersionHistoryItem
}

domainGetter interface {
Expand Down Expand Up @@ -1866,30 +1867,46 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
// 3. the last first event ID (the event ID of the last batch of events in the history)
// 4. the next event ID
// 5. whether the workflow is closed
// 6. error if any
// 6 The version history
// 7. error if any
queryHistory := func(
domainUUID string,
execution *types.WorkflowExecution,
expectedNextEventID int64,
currentBranchToken []byte,
) ([]byte, string, int64, int64, bool, error) {
versionHistoryItem *persistence.VersionHistoryItem,
) ([]byte, string, int64, int64, bool, *types.VersionHistoryItem, error) {

response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
DomainUUID: domainUUID,
Execution: execution,
ExpectedNextEventID: expectedNextEventID,
CurrentBranchToken: currentBranchToken,
VersionHistoryItem: versionHistoryItem.ToInternalType(),
})

if err != nil {
return nil, "", 0, 0, false, err
return nil, "", 0, 0, false, nil, err
}

isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
currentVersionHistory, err := persistence.NewVersionHistoriesFromInternalType(response.VersionHistories).GetCurrentVersionHistory()
if err != nil {
wh.GetLogger().Error("Failed to get current version history", tag.Dynamic("version-histories", response.VersionHistories))
return nil, "", 0, 0, false, nil, fmt.Errorf("failed to get the current version from the response from history: %w", err)
}

lastVersionHistoryItem, err := currentVersionHistory.GetLastItem()
if err != nil {
return nil, "", 0, 0, false, nil, err
}

return response.CurrentBranchToken,
response.Execution.GetRunID(),
response.GetLastFirstEventID(),
response.GetNextEventID(),
isWorkflowRunning,
lastVersionHistoryItem.ToInternalType(),
nil
}

Expand Down Expand Up @@ -1931,8 +1948,10 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
if !isCloseEventOnly {
queryNextEventID = token.NextEventID
}
token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, token.BranchToken)

vh := persistence.NewVersionHistoryItemFromInternalType(token.VersionHistoryItem)
token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, token.VersionHistoryItem, err =
queryHistory(domainID, execution, queryNextEventID, token.BranchToken, vh)
if err != nil {
return nil, err
}
Expand All @@ -1944,8 +1963,8 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
if !isCloseEventOnly {
queryNextEventID = common.FirstEventID
}
token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, nil)
token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, token.VersionHistoryItem, err =
queryHistory(domainID, execution, queryNextEventID, nil, nil)
if err != nil {
return nil, err
}
Expand Down
Loading