Skip to content

10 KisFlow FaaS Data Serialize

刘丹冰 edited this page Apr 17, 2024 · 1 revision

Case Source Code https://github.com/aceld/kis-flow-usage/tree/main/9-serialize

10.1 Serialize Interface

// Serialize data serialization interface
type Serialize interface {
	// UnMarshal is used to deserialize KisRowArr into a specified type of value.
	UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
	// Marshal is used to serialize a specified type of value into KisRowArr.
	Marshal(interface{}) (common.KisRowArr, error)
}

10.2 KisFlow Default Serialize

KisFlow provides default data serialization (supports Json strings, Go structs, etc.):

type DefaultSerialize struct{}

// UnMarshal is used to deserialize KisRowArr into a specified type of value.
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
	// Ensure the passed type is a slice
	if r.Kind() != reflect.Slice {
		return reflect.Value{}, fmt.Errorf("r must be a slice")
	}

	slice := reflect.MakeSlice(r, 0, len(arr))

	// Iterate over each element and attempt to deserialize
	for _, row := range arr {
		var elem reflect.Value
		var err error

		// Attempt to assert as struct or pointer
		elem, err = unMarshalStruct(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
			continue
		}

		// Attempt to directly deserialize string
		elem, err = unMarshalJsonString(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
			continue
		}

		// Attempt to first serialize to JSON and then deserialize
		elem, err = unMarshalJsonStruct(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
		} else {
			return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
		}
	}

	return slice, nil
}

// Attempt to assert as struct or pointer
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// Check if row is a struct or struct pointer type
	rowType := reflect.TypeOf(row)
	if rowType == nil {
		return reflect.Value{}, fmt.Errorf("row is nil pointer")
	}
	if rowType.Kind() != reflect.Struct && rowType.Kind() != reflect.Ptr {
		return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
	}

	// If row is a pointer type, get the type it points to
	if rowType.Kind() == reflect.Ptr {
		// Nil pointer
		if reflect.ValueOf(row).IsNil() {
			return reflect.Value{}, fmt.Errorf("row is nil pointer")
		}

		// Dereference
		row = reflect.ValueOf(row).Elem().Interface()

		// Get the type after dereferencing
		rowType = reflect.TypeOf(row)
	}

	// Check if row can be asserted to elemType (target type)
	if !rowType.AssignableTo(elemType) {
		return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
	}

	// Convert row to reflect.Value and return
	return reflect.ValueOf(row), nil
}

// Attempt to directly deserialize string (deserialize Json string to struct)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// Check if the source data can be asserted as string
	str, ok := row.(string)
	if !ok {
		return reflect.Value{}, fmt.Errorf("not a string")
	}

	// Create a new struct instance to store the deserialized value
	elem := reflect.New(elemType).Elem()

	// Attempt to deserialize the Json string to a struct.
	if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
		return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
	}

	return elem, nil
}

// Attempt to first serialize to JSON and then deserialize (convert struct to Json string and then deserialize the Json string to struct)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// Serialize row to JSON string
	jsonBytes, err := json.Marshal(row)
	if err != nil {
		return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v  ", err)
	}

	// Create a new struct instance to store the deserialized value
	elem := reflect.New(elemType).Interface()

	// Deserialize JSON string to struct
	if err := json.Unmarshal(jsonBytes, elem); err != nil {
		return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v  ", err)
	}

	return reflect.ValueOf(elem).Elem(), nil
}

// Marshal is used to serialize a specified type of value into KisRowArr (Json serialization).
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
	var arr common.KisRowArr

	switch reflect.TypeOf(i).Kind() {
	case reflect.Slice, reflect.Array:
		slice := reflect.ValueOf(i)
		for i := 0; i < slice.Len(); i++ {
			// Serialize each element to a JSON string and append it to the slice.
			jsonBytes, err := json.Marshal(slice.Index(i).Interface())
			if err != nil {
				return nil, fmt.Errorf("failed to marshal element to JSON: %v  ", err)
			}
			arr = append(arr, string(jsonBytes))
		}
	default:
		// If it is not a slice or array type, serialize the entire struct directly to a JSON string.
		jsonBytes, err := json.Marshal(i)
		if err != nil {
			return nil, fmt.Errorf("failed to marshal element to JSON: %v  ", err)
		}
		arr = append(arr, string(jsonBytes))
	}

	return arr, nil
}

10.3 Serialize & FaaS

You can use the current data serialization form serialize.DefaultSerialize by combining serialization tools through the last custom parameter passed in FaaS. Developers can also customize and override a serialize for combined use.

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
	serialize.DefaultSerialize
	StuScore
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
	fmt.Printf("->Call Func VerifyStu\n")

	fmt.Println("school = ", flow.GetFuncParam("school"))
	fmt.Println("country = ", flow.GetFuncParam("country"))

	for _, stu := range rows {
		// Filter out invalid data
		if stu.StuId < 0 || stu.StuId > 999 {
			continue
		}

		_ = flow.CommitRow(stu)
	}

	return nil
}