Skip to content

03 KisFlow Commit

刘丹冰 edited this page Apr 16, 2024 · 1 revision

Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/3-commit

1. Commit String text

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
	_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

func init() {
	// Register functions
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

Note: Currently, KisFlow's default string deserialization only supports the json text format by default. If developers want to add other formats, such as xml or other custom formats, they need to override the Serialize interface for the FaaS function's parameter input and output protocol.

For example:

type AvgStuScoreIn struct {
	serialize.DefaultSerialize  // Here, the default serialization scheme is used, developers can replace it
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

2. Commit Struct

main.go

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

        // Make a struct
	stu106 := AvgStuScoreIn{
		StuId:  106,
		Score1: 80,
		Score2: 81,
		Score3: 82,
	}

        // Commit
	_ = flow1.CommitRow(stu106)


	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

3. Commit the Sturct Pointer

main.go

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

        // Make a struct
	stu107 := AvgStuScoreIn{
		StuId:  107,
		Score1: 82,
		Score2: 83,
		Score3: 84,
	}

        // Commit 
	_ = flow1.CommitRow(&stu107)

	// Submit struct slice
	if err := flow1.CommitRowBatch(dataStructsPtr); err != nil {
		fmt.Println("err: ", err)
		return
	}

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

4. (Batch) Commit String text

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	// data string slice
	dataStrings := []string{
		`{"stu_id":103, "score_1":100, "score_2":90, "score_3":80}`,
		`{"stu_id":104, "score_1":100, "score_2":70, "score_3":60}`,
		`{"stu_id":105, "score_1":80, "score_2":50, "score_3":50}`,
	}

	// Submit string slice
	if err := flow1.CommitRowBatch(dataStrings); err != nil {
		fmt.Println("err: ", err)
		return
	}
    
	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

func init() {
	// Register functions
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

5. (Batch) Commit Struct

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}


	dataStructs := []AvgStuScoreIn{
		{
			StuId:  108,
			Score1: 82,
			Score2: 83,
			Score3: 84,
		},
		{
			StuId:  109,
			Score1: 82,
			Score2: 83,
			Score3: 84,
		},
	}

	// Submit struct slice
	if err := flow1.CommitRowBatch(dataStructs); err != nil {
		fmt.Println("err: ", err)
		return
	}

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

func init() {
	// Register functions
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

6. (Batch) Commit the Sturct Pointer

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	dataStructsPtr := []*AvgStuScoreIn{
		{
			StuId:  110,
			Score1: 82,
			Score2: 83,
			Score3: 84,
		},
		{
			StuId:  110,
			Score1: 82,
			Score2: 83,
			Score3: 84,
		},
	}

	// Submit struct slice
	if err := flow1.CommitRowBatch(dataStructsPtr); err != nil {
		fmt.Println("err: ", err)
		return
	}

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

func init() {
	// Register functions
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}