Skip to content

Commit

Permalink
Ignore invalid metadata events
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily committed Sep 11, 2023
1 parent 7517d35 commit f375c7c
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions pkg/schema/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (sr *schemaRepo) Watcher() {
case EventKindGroup:
_, err = sr.StoreGroup(evt.Metadata)
case EventKindResource:
_, err = sr.storeResource(evt.Metadata)
err = sr.storeResource(evt.Metadata)
}
case EventDelete:
switch evt.Kind {
Expand All @@ -194,6 +194,11 @@ func (sr *schemaRepo) Watcher() {
}
}
if err != nil && !errors.Is(err, schema.ErrClosed) {
select {
case <-sr.closer.CloseNotify():
return
default:
}
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
sr.SendMetadataEvent(evt)
}
Expand Down Expand Up @@ -297,19 +302,26 @@ func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, bool)
return g.LoadResource(metadata.Name)
}

func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) (Resource, error) {
func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) error {
g, ok := sr.LoadGroup(metadata.Group)
if !ok {
var err error
if g, err = sr.StoreGroup(&commonv1.Metadata{Name: metadata.Group}); err != nil {
return nil, errors.WithMessagef(err, "create unknown group:%s", metadata.Group)
return errors.WithMessagef(err, "create unknown group:%s", metadata.Group)
}
}
stm, err := sr.resourceSchemaSupplier.ResourceSchema(metadata)
if err != nil {
return nil, errors.WithMessage(err, "fails to get the resource")
if errors.Is(err, schema.ErrGRPCResourceNotFound) {
if dl := sr.l.Debug(); dl.Enabled() {
dl.Interface("metadata", metadata).Msg("resource not found")
}
return nil
}
return errors.WithMessage(err, "fails to get the resource")
}
return g.(*group).storeResource(context.Background(), stm)
_, err = g.(*group).storeResource(context.Background(), stm)
return err
}

func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error {
Expand Down

0 comments on commit f375c7c

Please sign in to comment.