Skip to content

08 KisFlow Action

刘丹冰 edited this page Apr 17, 2024 · 1 revision

Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/7-flow_action

In the callback calculation logic of the Function's FaaS function, you can control the actions following the current Function by using return flow.Next().

1. No Action (Default Action)

In the callback calculation logic of the Function's FaaS function, if there is no specific action to take, you can use return nil or flow.Next() without passing any parameters as the final return value of the current FaaS. Then, after the current Function is executed, it will automatically proceed to the next Function layer and submit the calculated result data to the next layer as usual, as shown below:

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
	serialize.DefaultSerialize
	StuScore
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
	fmt.Printf("->Call Func VerifyStu\n")

	for _, stu := range rows {
		// Filter out invalid data
		if stu.StuId < 0 || stu.StuId > 999 {
			continue
		}

		_ = flow.CommitRow(stu)
	}

	return flow.Next()
}

2. Action Abort (Termination)

In the FaaS, you can terminate the current Flow process by using flow.Next(kis.ActionAbort).

yuque_mind (2)

Using the VerifyStu Function as an example, if the StuId of the source data exceeds the specified range, the current Flow process will be terminated immediately. The code is as follows:

faas_stu_verify.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
	serialize.DefaultSerialize
	StuScore
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
	fmt.Printf("->Call Func VerifyStu\n")

	for _, stu := range rows {
		// Filter out invalid data
		if stu.StuId < 0 || stu.StuId > 999 {
			// Terminate the current Flow process, and the subsequent Functions in the current Flow will not be executed
			return flow.Next(kis.ActionAbort)
		}

		_ = flow.CommitRow(stu)
	}

	return flow.Next()
}

Output

Add KisPool FuncName=VerifyStu
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
->Call Func VerifyStu

You will notice that if invalid StuId data is triggered, only the first Function, VerifyStu, will be executed, and the other two Functions will not be executed.

3. Action DataReuse (Data Reuse)

If the current Function does not produce any calculation results, or if the calculation results of the current Function are the same as the source data, KisFlow can use ActionDataReuse in the FaaS return flow.Next(). This way, the source data obtained by the next layer of Function will be the same as the source data obtained by the current layer of Function.

yuque_mind (3)

In the VerifyStu Function, if there are no invalid data, we can also choose not to use flow.Commit() to submit data. Instead, we can return flow.Next(kis.ActionDataReuse) at the end of the FaaS. In this way, the next layer Function, AvgStuScore(), will have the same source data as the VerifyStu() Function.

faas_stu_verify.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
	serialize.DefaultSerialize
	StuScore
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
	fmt.Printf("->Call Func VerifyStu\n")

	for _, stu := range rows {
		// Filter out invalid data
		if stu.StuId < 0 || stu.StuId > 999 {
			// Terminate the current Flow process, and the subsequent Functions in the current Flow will not be executed
			return flow.Next(kis.ActionAbort)
		}
	}

	return flow.Next(kis.ActionDataReuse)
}

VerifyStu() as the first layer Function of the Flow does not submit result data.

main.go

func main() {
    // ... ...
    // ... ...

    // Submit a string
	_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
	_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":90, "score_3":70}`)

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

    // ... ...
    // ... ...
}

func init() {
	// Register functions
	kis.Pool().FaaS("VerifyStu", VerifyStu)
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

Output:

Add KisPool FuncName=VerifyStu
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
->Call Func VerifyStu
->Call Func AvgStuScore
->Call Func PrintStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [86.66666666666667]

In the main function, two data entries were submitted to flow1, and the final result is still two entries.

4. Action ForceEntryNext (Force Entry to Next Layer)

In the execution flow of a Flow in KisFlow, the Flow will be terminated by default under the following conditions:

  • The current Function has not submitted any data.
  • The current Function also does not use flow.Next(kis.ActionDataReuse) to reuse the upper-layer result data as the result data of this layer.

If both of the above conditions are met, the current Function will end, and the next layer will not be scheduled for execution. (Because there is no data left, executing subsequent Functions is meaningless.)

However, if the developer wants to enter the next layer even if there is no data, they can use flow.Next(kis.ActionForceEntryNext) to force entry into the next Function layer. Although no data will be passed to the next layer, the FaaS function of the next layer will be invoked.

faas_stu_verify.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
	serialize.DefaultSerialize
	StuScore
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
	fmt.Printf("->Call Func VerifyStu\n")

	for _, stu := range rows {
		// Filter out invalid data
		if stu.StuId < 0 || stu.StuId > 999 {
			// Terminate the current Flow process, and the subsequent Functions in the current Flow will not be executed
			return flow.Next(kis.ActionAbort)
		}
	}

    // Even if no data is submitted, you can force entry into the next layer; otherwise, the flow will terminate at this layer
	return flow.Next(kis.ActionForceEntryNext)
}

This approach will allow the Flow to proceed to the next Function layer, even if no data is submitted from the current Function.

5. Action JumpFunc (Jump to a Specific Function Level)

KisFlow also provides the ability to jump to a specific Function to continue execution (use with caution, as it can easily lead to an infinite loop; ensure there are conditions to break the loop).

yuque_mind (4)

The following example demonstrates jumping back to Function: VerifyStu from the last Function: PrintStuAvgScore to execute it again.

faas_stu_score_avg_print.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
	serialize.DefaultSerialize
	StuScoreAvg
}

type PrintStuAvgScoreOut struct {
	serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
	fmt.Printf("->Call Func PrintStuAvgScore\n")

	for _, row := range rows {
		fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
	}

	// Jump to a specific Function
	return flow.Next(kis.ActionJumpFunc("VerifyStu"))
}

This will cause the current Flow to loop indefinitely, unless there's a condition to exit the Flow, preventing it from reaching Function: PrintStuAvgScore again.