Skip to content

Commit

Permalink
Fix bugs and add docs (#548)
Browse files Browse the repository at this point in the history

* Add doc

* Append different tag and field structure

* Fix the bug that accesses restored data

---------

Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily authored Oct 6, 2024
1 parent 35306d2 commit d14d599
Show file tree
Hide file tree
Showing 18 changed files with 976 additions and 114 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Release Notes.
- Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order.
- Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase.
- Fix the bug that the long running query doesn't stop when the context is canceled.
- Fix the bug that merge block with different tags or fields.

### Documentation

Expand Down
3 changes: 3 additions & 0 deletions banyand/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ COPY build/bin/windows/${TARGETARCH}/banyand-server-static "/banyand"

FROM build-${TARGETOS} AS final

ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR
ENV GRPC_GO_LOG_FORMATTER=json

EXPOSE 17912
EXPOSE 17913
EXPOSE 6060
Expand Down
216 changes: 180 additions & 36 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package measure

import (
"fmt"
"slices"
"sort"

Expand Down Expand Up @@ -787,63 +788,206 @@ func (bi *blockPointer) appendAll(b *blockPointer) {
bi.append(b, len(b.timestamps))
}

var log = logger.GetLogger("measure").Named("block")

func (bi *blockPointer) append(b *blockPointer, offset int) {
if offset <= b.idx {
return
}
if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
for _, tf := range b.tagFamilies {
tagFamily := columnFamily{name: tf.name}
for _, c := range tf.columns {
col := column{name: c.name, valueType: c.valueType}
assertIdxAndOffset(col.name, len(c.values), b.idx, offset)
col.values = append(col.values, c.values[b.idx:offset]...)
tagFamily.columns = append(tagFamily.columns, col)
fullTagAppend(bi, b, offset)
} else {
if err := fastTagAppend(bi, b, offset); err != nil {
if log.Debug().Enabled() {
log.Debug().Msgf("fastTagMerge failed: %v; falling back to fullTagMerge", err)
}
bi.tagFamilies = append(bi.tagFamilies, tagFamily)
fullTagAppend(bi, b, offset)
}
}

if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
fullFieldAppend(bi, b, offset)
} else {
if len(bi.tagFamilies) != len(b.tagFamilies) {
logger.Panicf("unexpected number of tag families: got %d; want %d", len(bi.tagFamilies), len(b.tagFamilies))
if err := fastFieldAppend(bi, b, offset); err != nil {
if log.Debug().Enabled() {
log.Debug().Msgf("fastFieldAppend failed: %v; falling back to fullFieldAppend", err)
}
fullFieldAppend(bi, b, offset)
}
}

assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
}

func fastTagAppend(bi, b *blockPointer, offset int) error {
if len(bi.tagFamilies) != len(b.tagFamilies) {
return fmt.Errorf("unexpected number of tag families: got %d; want %d", len(b.tagFamilies), len(bi.tagFamilies))
}
for i := range bi.tagFamilies {
if bi.tagFamilies[i].name != b.tagFamilies[i].name {
return fmt.Errorf("unexpected tag family name: got %q; want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name)
}
for i := range bi.tagFamilies {
if bi.tagFamilies[i].name != b.tagFamilies[i].name {
logger.Panicf("unexpected tag family name: got %q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name)
if len(bi.tagFamilies[i].columns) != len(b.tagFamilies[i].columns) {
return fmt.Errorf("unexpected number of tags for tag family %q: got %d; want %d",
bi.tagFamilies[i].name, len(b.tagFamilies[i].columns), len(bi.tagFamilies[i].columns))
}
for j := range bi.tagFamilies[i].columns {
if bi.tagFamilies[i].columns[j].name != b.tagFamilies[i].columns[j].name {
return fmt.Errorf("unexpected tag name for tag family %q: got %q; want %q",
bi.tagFamilies[i].name, b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name)
}
if len(bi.tagFamilies[i].columns) != len(b.tagFamilies[i].columns) {
logger.Panicf("unexpected number of tags for tag family %q: got %d; want %d", bi.tagFamilies[i].name, len(bi.tagFamilies[i].columns), len(b.tagFamilies[i].columns))
assertIdxAndOffset(b.tagFamilies[i].columns[j].name, len(b.tagFamilies[i].columns[j].values), b.idx, offset)
bi.tagFamilies[i].columns[j].values = append(bi.tagFamilies[i].columns[j].values, b.tagFamilies[i].columns[j].values[b.idx:offset]...)
}
}
return nil
}

func fullTagAppend(bi, b *blockPointer, offset int) {
existDataSize := len(bi.timestamps)
appendTagFamilies := func(tf columnFamily) {
tagFamily := columnFamily{name: tf.name}
for i := range tf.columns {
assertIdxAndOffset(tf.columns[i].name, len(tf.columns[i].values), b.idx, offset)
col := column{name: tf.columns[i].name, valueType: tf.columns[i].valueType}
for j := 0; j < existDataSize; j++ {
col.values = append(col.values, nil)
}
col.values = append(col.values, tf.columns[i].values[b.idx:offset]...)
tagFamily.columns = append(tagFamily.columns, col)
}
bi.tagFamilies = append(bi.tagFamilies, tagFamily)
}
if len(bi.tagFamilies) == 0 {
for _, tf := range b.tagFamilies {
appendTagFamilies(tf)
}
return
}

tagFamilyMap := make(map[string]*columnFamily)
for i := range bi.tagFamilies {
tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i]
}

for _, tf := range b.tagFamilies {
if existingTagFamily, exists := tagFamilyMap[tf.name]; exists {
columnMap := make(map[string]*column)
for i := range existingTagFamily.columns {
columnMap[existingTagFamily.columns[i].name] = &existingTagFamily.columns[i]
}
for j := range bi.tagFamilies[i].columns {
if bi.tagFamilies[i].columns[j].name != b.tagFamilies[i].columns[j].name {
logger.Panicf("unexpected tag name for tag family %q: got %q; want %q", bi.tagFamilies[i].name, bi.tagFamilies[i].columns[j].name, b.tagFamilies[i].columns[j].name)

for _, c := range tf.columns {
if existingColumn, exists := columnMap[c.name]; exists {
assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
existingColumn.values = append(existingColumn.values, c.values[b.idx:offset]...)
} else {
assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
col := column{name: c.name, valueType: c.valueType}
for j := 0; j < existDataSize; j++ {
col.values = append(col.values, nil)
}
col.values = append(col.values, c.values[b.idx:offset]...)
existingTagFamily.columns = append(existingTagFamily.columns, col)
}
assertIdxAndOffset(b.tagFamilies[i].columns[j].name, len(b.tagFamilies[i].columns[j].values), b.idx, offset)
bi.tagFamilies[i].columns[j].values = append(bi.tagFamilies[i].columns[j].values, b.tagFamilies[i].columns[j].values[b.idx:offset]...)
}
} else {
appendTagFamilies(tf)
}
}
for k := range tagFamilyMap {
delete(tagFamilyMap, k)
}
for i := range b.tagFamilies {
tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i]
}
emptySize := offset - b.idx
for _, tf := range bi.tagFamilies {
if _, exists := tagFamilyMap[tf.name]; !exists {
for i := range tf.columns {
for j := 0; j < emptySize; j++ {
tf.columns[i].values = append(tf.columns[i].values, nil)
}
}
} else {
existingTagFamily := tagFamilyMap[tf.name]
columnMap := make(map[string]*column)
for i := range existingTagFamily.columns {
columnMap[existingTagFamily.columns[i].name] = &existingTagFamily.columns[i]
}
for i := range tf.columns {
if _, exists := columnMap[tf.columns[i].name]; !exists {
for j := 0; j < emptySize; j++ {
tf.columns[i].values = append(tf.columns[i].values, nil)
}
}
}
}
}
}

if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
for _, c := range b.field.columns {
col := column{name: c.name, valueType: c.valueType}
assertIdxAndOffset(col.name, len(c.values), b.idx, offset)
col.values = append(col.values, c.values[b.idx:offset]...)
bi.field.columns = append(bi.field.columns, col)
func fastFieldAppend(bi, b *blockPointer, offset int) error {
if len(bi.field.columns) != len(b.field.columns) {
return fmt.Errorf("unexpected number of fields: got %d; want %d", len(bi.field.columns), len(b.field.columns))
}
for i := range bi.field.columns {
if bi.field.columns[i].name != b.field.columns[i].name {
return fmt.Errorf("unexpected field name: got %q; want %q", b.field.columns[i].name, bi.field.columns[i].name)
}
} else {
if len(bi.field.columns) != len(b.field.columns) {
logger.Panicf("unexpected number of fields: got %d; want %d", len(bi.field.columns), len(b.field.columns))
assertIdxAndOffset(b.field.columns[i].name, len(b.field.columns[i].values), b.idx, offset)
bi.field.columns[i].values = append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...)
}
return nil
}

func fullFieldAppend(bi, b *blockPointer, offset int) {
existDataSize := len(bi.timestamps)
appendFields := func(c column) {
col := column{name: c.name, valueType: c.valueType}
for j := 0; j < existDataSize; j++ {
col.values = append(col.values, nil)
}
for i := range bi.field.columns {
assertIdxAndOffset(b.field.columns[i].name, len(b.field.columns[i].values), b.idx, offset)
bi.field.columns[i].values = append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...)
col.values = append(col.values, c.values[b.idx:offset]...)
bi.field.columns = append(bi.field.columns, col)
}
if len(bi.field.columns) == 0 {
for _, c := range b.field.columns {
appendFields(c)
}
return
}

assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
fieldMap := make(map[string]*column)
for i := range bi.field.columns {
fieldMap[bi.field.columns[i].name] = &bi.field.columns[i]
}

for _, c := range b.field.columns {
if existingField, exists := fieldMap[c.name]; exists {
assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
existingField.values = append(existingField.values, c.values[b.idx:offset]...)
} else {
appendFields(c)
}
}
for k := range fieldMap {
delete(fieldMap, k)
}
for i := range b.field.columns {
fieldMap[b.field.columns[i].name] = &b.field.columns[i]
}

emptySize := offset - b.idx
for i := range bi.field.columns {
if _, exists := fieldMap[bi.field.columns[i].name]; !exists {
for j := 0; j < emptySize; j++ {
bi.field.columns[i].values = append(bi.field.columns[i].values, nil)
}
}
}
}

func assertIdxAndOffset(name string, length int, idx int, offset int) {
Expand Down
Loading

0 comments on commit d14d599

Please sign in to comment.