Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResultMarshal-range #99

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 106 additions & 8 deletions tablestore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"strings"
"time"

"github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol"
"sync"

"github.com/golang/protobuf/proto"
lruCache "github.com/hashicorp/golang-lru"
"sync"

"github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol"
)

type internalClient struct {
Expand Down Expand Up @@ -368,7 +370,7 @@ type VariantType int32
const (
Variant_INTEGER VariantType = 0
Variant_DOUBLE VariantType = 1
//VT_BOOLEAN = 2;
// VT_BOOLEAN = 2;
Variant_STRING VariantType = 3
)

Expand All @@ -380,7 +382,7 @@ type ValueTransferRule struct {
type SingleColumnCondition struct {
Comparator *ComparatorType
ColumnName *string
ColumnValue interface{} //[]byte
ColumnValue interface{} // []byte
FilterIfMissing bool
LatestVersionOnly bool
TransferRule *ValueTransferRule
Expand Down Expand Up @@ -555,6 +557,24 @@ type GetRowRequest struct {
ExtraRequestInfo
}

func (response GetRowResponse) Marshal() (primaryKeys map[string]interface{}, rows map[string]interface{}) {
primaryKeys = make(map[string]interface{})
rows = make(map[string]interface{})
PrimaryKeyRaws := response.PrimaryKey.PrimaryKeys
index := uint(0)
for range PrimaryKeyRaws {
primaryKeys[PrimaryKeyRaws[index].ColumnName] = PrimaryKeyRaws[index].Value
index++
}
index = 0
RowRaws := response.Columns
for range RowRaws {
rows[RowRaws[index].ColumnName] = RowRaws[index].Value
index++
}
return
}

type MultiRowQueryCriteria struct {
PrimaryKey []*PrimaryKey
ColumnsToGet []string
Expand Down Expand Up @@ -599,6 +619,24 @@ type RowResult struct {
Index int32
}

func (rowResponse RowResult) Marshal() (PrimaryKeys map[string]interface{}, Rows map[string]interface{}) {
index := uint(0)
PrimaryKeys = make(map[string]interface{})
primaryKeysRaw := rowResponse.PrimaryKey.PrimaryKeys
for range primaryKeysRaw {
PrimaryKeys[primaryKeysRaw[index].ColumnName] = primaryKeysRaw[index].Value
index++
}
index = 0 // reset index
Rows = make(map[string]interface{})
rowsRaw := rowResponse.Columns
for range rowsRaw {
Rows[rowsRaw[index].ColumnName] = rowsRaw[index].Value
index++
}
return
}

type RowChange interface {
Serialize() []byte
getOperationType() otsprotocol.OperationType
Expand All @@ -610,8 +648,50 @@ type BatchGetRowResponse struct {
TableToRowsResult map[string][]RowResult
ResponseInfo
}
type SingleRow struct {
PrimaryKey map[string]interface{} // 主键
Row map[string]interface{} // 行数据
Err Error // 错误信息
}
type Rows struct {
mu sync.Mutex
Data RowData
}
type RowData map[int32]*SingleRow

func (response *GetRangeResponse) Marshal() (*RowData, error) {
rows := &Rows{Data: make(map[int32]*SingleRow)}
var wg sync.WaitGroup
if len(response.Rows) <= 256 {
rowData := make(RowData)

for index, data := range response.Rows {
pk, row := data.Marshal()
rowData[int32(index)] = &SingleRow{PrimaryKey: pk, Row: row}
}
return &rowData, nil
}
for index, data := range response.Rows { // 假设response.Rows是Row类型的切片,且Row实现了Marshal方法
wg.Add(1)
go func(index int32, data *Row) {
defer wg.Done()

singleRow := new(SingleRow)
singleRow.PrimaryKey, singleRow.Row = data.Marshal() // 假设Row类型有Marshal方法可以直接使用

// 在写入之前锁定mutex
rows.mu.Lock()
rows.Data[index] = singleRow
rows.mu.Unlock()
}(int32(index), data)
}
wg.Wait()
// 返回指向Rows.Data的指针,满足函数签名的要求
return &rows.Data, nil
}

// IsAtomic设置是否为批量原子写
// BatchWriteRowRequest
// IsAtomic 设置是否为批量原子写
// 如果设置了批量原子写,需要保证写入到同一张表格中的分区键相同,否则会写入失败
type BatchWriteRowRequest struct {
RowChangesGroupByTable map[string][]RowChange
Expand Down Expand Up @@ -708,6 +788,24 @@ type GetRangeResponse struct {
ResponseInfo
}

func (response Row) Marshal() (PrimaryKeys map[string]interface{}, rows map[string]interface{}) {
PrimaryKeysRaw := response.PrimaryKey.PrimaryKeys
PrimaryKeys = make(map[string]interface{})
index := uint(0)
for range PrimaryKeysRaw {
PrimaryKeys[PrimaryKeysRaw[index].ColumnName] = PrimaryKeysRaw[index].Value
index++
}
index = 0
rows = make(map[string]interface{})
rowsRaw := response.Columns
for range rowsRaw {
rows[rowsRaw[index].ColumnName] = rowsRaw[index].Value
index++
}
return
}

type SQLQueryRequest struct {
Query string
ExtraRequestInfo
Expand Down Expand Up @@ -756,15 +854,15 @@ type ListStreamResponse struct {
type StreamSpecification struct {
EnableStream bool
ExpirationTime int32 // must be positive. in hours
OriginColumnsToGet []string //origin columns to get for stream data
OriginColumnsToGet []string // origin columns to get for stream data
}

type StreamDetails struct {
EnableStream bool
StreamId *StreamId // nil when stream is disabled.
ExpirationTime int32 // in hours
LastEnableTime int64 // the last time stream is enabled, in usec
OriginColumnsToGet []string //origin columns to get for stream data
OriginColumnsToGet []string // origin columns to get for stream data
}

type DescribeStreamRequest struct {
Expand Down