Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watcher to sync data in etcd to local cache #319

Merged
merged 3 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Release Notes.
- Fix timer not released
- BanyanDB ui misses fields when creating a group
- Fix data duplicate writing
- Syncing metadata change events from etcd instead of a local channel.

### Chores

Expand Down
2 changes: 1 addition & 1 deletion api/proto/banyandb/database/v1/database.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ enum Role {
}

message Node {
string name = 1;
common.v1.Metadata metadata = 1;
repeated Role roles = 2;
string grpc_address = 3;
string http_address = 4;
Expand Down
4 changes: 2 additions & 2 deletions banyand/liaison/grpc/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (ds *discoveryService) initialize(ctx context.Context) error {
return fmt.Errorf("unsupported kind: %d", ds.kind)
}
}
ds.metadataRepo.RegisterHandler(schema.KindShard, ds.shardRepo)
ds.metadataRepo.RegisterHandler(ds.kind, ds.entityRepo)
ds.metadataRepo.RegisterHandler("liaison", schema.KindShard, ds.shardRepo)
ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo)
return nil
}

Expand Down
5 changes: 0 additions & 5 deletions banyand/measure/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
})
cancel()
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subject")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Expand All @@ -107,7 +106,6 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
defer cancel()
subjects, err := sr.metadata.Subjects(ctx, metadata.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE)
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subjects(measure)")
return
}
for _, sub := range subjects {
Expand All @@ -121,7 +119,6 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
// createOrUpdate TopN schemas in advance
_, err := createOrUpdateTopNMeasure(context.Background(), sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation))
if err != nil {
sr.l.Error().Err(err).Msg("fail to create/update topN measure")
return
}
// reload source measure
Expand Down Expand Up @@ -231,7 +228,6 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
Group: metadata.Group,
})
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subject")
return
}
// we should update instead of delete
Expand All @@ -245,7 +241,6 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
case schema.KindTopNAggregation:
err := sr.removeTopNMeasure(metadata.Spec.(*databasev1.TopNAggregation).GetSourceMeasure())
if err != nil {
sr.l.Error().Err(err).Msg("fail to remove topN measure")
return
}
// we should update instead of delete
Expand Down
64 changes: 3 additions & 61 deletions banyand/measure/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ package measure
import (
"context"
"path"
"time"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
Expand Down Expand Up @@ -107,81 +105,25 @@ func (s *service) Role() databasev1.Role {
return databasev1.Role_ROLE_DATA
}

func (s *service) PreRun(ctx context.Context) error {
func (s *service) PreRun(_ context.Context) error {
s.l = logger.GetLogger(s.Name())
ctxGroup, cancelGroup := context.WithTimeout(ctx, 5*time.Second)
groups, err := s.metadata.GroupRegistry().ListGroup(ctxGroup)
cancelGroup()
if err != nil {
return err
}
path := path.Join(s.root, s.Name())
observability.UpdatePath(path)
s.schemaRepo = newSchemaRepo(path, s.metadata, s.dbOpts,
s.l, s.pipeline, int64(s.BlockEncoderBufferSize), int64(s.BlockBufferSize))
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
continue
}
gp, innerErr := s.schemaRepo.StoreGroup(g.Metadata)
if innerErr != nil {
return innerErr
}
ctxMeasure, cancelMeasure := context.WithTimeout(ctx, 5*time.Second)
allMeasureSchemas, innerErr := s.metadata.MeasureRegistry().
ListMeasure(ctxMeasure, schema.ListOpt{Group: gp.GetSchema().GetMetadata().GetName()})
cancelMeasure()
if innerErr != nil {
return innerErr
}
for _, measureSchema := range allMeasureSchemas {
// sanity check before calling StoreResource
// since StoreResource may be called inside the event loop
if checkErr := s.sanityCheck(ctx, gp, measureSchema); checkErr != nil {
return checkErr
}
if _, innerErr := gp.StoreResource(ctx, measureSchema); innerErr != nil {
return innerErr
}
}
}
// run a serial watcher
go s.schemaRepo.Watcher()
s.metadata.
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
RegisterHandler("measure", schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
&s.schemaRepo)

s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
err = s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
err := s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
if err != nil {
return err
}
return nil
}

func (s *service) sanityCheck(ctx context.Context, group resourceSchema.Group, measureSchema *databasev1.Measure) error {
var topNAggrs []*databasev1.TopNAggregation
ctxLocal, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
topNAggrs, err := s.metadata.MeasureRegistry().TopNAggregations(ctxLocal, measureSchema.GetMetadata())
if err != nil || len(topNAggrs) == 0 {
return err
}

for _, topNAggr := range topNAggrs {
topNMeasure, innerErr := createOrUpdateTopNMeasure(ctx, s.metadata.MeasureRegistry(), topNAggr)
err = multierr.Append(err, innerErr)
if topNMeasure != nil {
_, storeErr := group.StoreResource(ctx, topNMeasure)
if storeErr != nil {
err = multierr.Append(err, storeErr)
}
}
}

return err
}

func (s *service) Serve() run.StopNotify {
return s.schemaRepo.StopCh()
}
Expand Down
10 changes: 6 additions & 4 deletions banyand/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func (s *clientService) PreRun(ctx context.Context) error {
ctxRegister, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err = s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{
Name: node.NodeID,
Metadata: &commonv1.Metadata{
Name: node.NodeID,
},
GrpcAddress: node.GrpcAddress,
HttpAddress: node.HTTPAddress,
Roles: nodeRoles,
Expand All @@ -92,7 +94,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
return err
}
s.alc = newAllocator(s.schemaRegistry, logger.GetLogger(s.Name()).Named("allocator"))
s.schemaRegistry.RegisterHandler(schema.KindGroup|schema.KindNode, s.alc)
s.schemaRegistry.RegisterHandler("shard-allocator", schema.KindGroup|schema.KindNode, s.alc)
return nil
}

Expand All @@ -106,8 +108,8 @@ func (s *clientService) GracefulStop() {
_ = s.schemaRegistry.Close()
}

func (s *clientService) RegisterHandler(kind schema.Kind, handler schema.EventHandler) {
s.schemaRegistry.RegisterHandler(kind, handler)
func (s *clientService) RegisterHandler(name string, kind schema.Kind, handler schema.EventHandler) {
s.schemaRegistry.RegisterHandler(name, kind, handler)
}

func (s *clientService) StreamRegistry() schema.Stream {
Expand Down
2 changes: 1 addition & 1 deletion banyand/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Repo interface {
TopNAggregationRegistry() schema.TopNAggregation
PropertyRegistry() schema.Property
ShardRegistry() schema.Shard
RegisterHandler(schema.Kind, schema.EventHandler)
RegisterHandler(string, schema.Kind, schema.EventHandler)
}

// Service is the metadata repository.
Expand Down
Loading
Loading