Skip to content

Commit

Permalink
[fix] Recursion caused by cycle step
Browse files Browse the repository at this point in the history
  • Loading branch information
inishchith committed Jan 10, 2024
1 parent af74fb7 commit 0c048d0
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
branches:
- main
- test
- 112
- 112-cycle-patch
- dev-release*

defaults:
Expand Down
27 changes: 16 additions & 11 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,23 @@ func (p Pipeline) GetAllBuckets() []string {

// GetDownstreamEdges returns all the downstream edges of a vertex
func (p Pipeline) GetDownstreamEdges(vertexName string) []Edge {
var f func(vertexName string, edges *[]Edge)
f = func(vertexName string, edges *[]Edge) {
for _, b := range p.ListAllEdges() {
if b.From == vertexName {
*edges = append(*edges, b)
f(b.To, edges)
}
var f func(vertexName string, edges *[]Edge, visited *map[string]bool)
f = func(vertexName string, edges *[]Edge, visited *map[string]bool) {
if (*visited)[vertexName] {
return
}
}
result := []Edge{}
f(vertexName, &result)
return result
for _, b := range p.ListAllEdges() {
if b.From == vertexName {
*edges = append(*edges, b)
(*visited)[vertexName] = true
f(b.To, edges, visited)
}
}
}
result := []Edge{}
visited := make(map[string]bool)
f(vertexName, &result, &visited)
return result
}

// HasSideInputs returns if the pipeline has side inputs.
Expand Down
14 changes: 11 additions & 3 deletions pkg/apis/numaflow/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,18 +301,26 @@ func Test_GetDownstreamEdges(t *testing.T) {
Edges: []Edge{
{From: "input", To: "p1"},
{From: "p1", To: "p11"},
{From: "p1", To: "p1"},
{From: "p1", To: "p2"},
{From: "p2", To: "output"},
},
},
}
edges := pl.GetDownstreamEdges("input")
assert.Equal(t, 4, len(edges))
assert.Equal(t, 5, len(edges))
assert.Equal(t, edges, pl.ListAllEdges())
assert.Equal(t, edges[2], Edge{From: "p1", To: "p2"})
assert.Equal(t, edges[2], Edge{From: "p1", To: "p1"})
assert.Equal(t, edges[3], Edge{From: "p1", To: "p2"})

edges = pl.GetDownstreamEdges("p1")
assert.Equal(t, 3, len(edges))
assert.Equal(t, 4, len(edges))

edges = pl.GetDownstreamEdges("p2")
assert.Equal(t, 1, len(edges))

edges = pl.GetDownstreamEdges("p11")
assert.Equal(t, 0, len(edges))

edges = pl.GetDownstreamEdges("output")
assert.Equal(t, 0, len(edges))
Expand Down

0 comments on commit 0c048d0

Please sign in to comment.