Skip to content

Commit

Permalink
Add mod revision check to write requests(measure/stream) (#322)
Browse files Browse the repository at this point in the history
* Add mod revision check to write requests(measure/stream)

- Support for create/update schema check mod revision
- Support for write data check mod revision
- Adapt to web console requests
- Update OAP e2e image
- Fix query failure caused by schema change

---------

Co-authored-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hailin0 and hanahmily authored Sep 15, 2023
1 parent 4722514 commit 9552688
Show file tree
Hide file tree
Showing 39 changed files with 382 additions and 150 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Release Notes.
- Implement the remote queue to spreading data to data nodes.
- Fix parse environment variables error
- Implement the distributed query engine.
- Add mod revision check to write requests.

### Bugs

Expand Down
16 changes: 12 additions & 4 deletions api/proto/banyandb/database/v1/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ message StreamRegistryServiceCreateRequest {
banyandb.database.v1.Stream stream = 1;
}

message StreamRegistryServiceCreateResponse {}
message StreamRegistryServiceCreateResponse {
int64 mod_revision = 1;
}

message StreamRegistryServiceUpdateRequest {
banyandb.database.v1.Stream stream = 1;
}

message StreamRegistryServiceUpdateResponse {}
message StreamRegistryServiceUpdateResponse {
int64 mod_revision = 1;
}

message StreamRegistryServiceDeleteRequest {
banyandb.common.v1.Metadata metadata = 1;
Expand Down Expand Up @@ -260,13 +264,17 @@ message MeasureRegistryServiceCreateRequest {
banyandb.database.v1.Measure measure = 1;
}

message MeasureRegistryServiceCreateResponse {}
message MeasureRegistryServiceCreateResponse {
int64 mod_revision = 1;
}

message MeasureRegistryServiceUpdateRequest {
banyandb.database.v1.Measure measure = 1;
}

message MeasureRegistryServiceUpdateResponse {}
message MeasureRegistryServiceUpdateResponse {
int64 mod_revision = 1;
}

message MeasureRegistryServiceDeleteRequest {
banyandb.common.v1.Metadata metadata = 1;
Expand Down
12 changes: 11 additions & 1 deletion api/proto/banyandb/measure/v1/write.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package banyandb.measure.v1;

import "banyandb/common/v1/common.proto";
import "banyandb/model/v1/common.proto";
import "banyandb/model/v1/write.proto";
import "google/protobuf/timestamp.proto";
import "validate/validate.proto";

Expand All @@ -43,10 +44,19 @@ message WriteRequest {
common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
// the data_point is required.
DataPointValue data_point = 2 [(validate.rules).message.required = true];
// the message_id is required.
uint64 message_id = 3 [(validate.rules).uint64.gt = 0];
}

// WriteResponse is the response contract for write
message WriteResponse {}
message WriteResponse {
// the message_id from request.
uint64 message_id = 1 [(validate.rules).uint64.gt = 0];
// status indicates the request processing result
model.v1.Status status = 2 [(validate.rules).enum.defined_only = true];
// the metadata from request when request fails
common.v1.Metadata metadata = 3 [(validate.rules).message.required = true];
}

message InternalWriteRequest {
uint32 shard_id = 1;
Expand Down
33 changes: 33 additions & 0 deletions api/proto/banyandb/model/v1/write.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

syntax = "proto3";

package banyandb.model.v1;

option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1";
option java_package = "org.apache.skywalking.banyandb.model.v1";

// Status is the response status for write
enum Status {
STATUS_UNSPECIFIED = 0;
STATUS_SUCCEED = 1;
STATUS_INVALID_TIMESTAMP = 2;
STATUS_NOT_FOUND = 3;
STATUS_EXPIRED_SCHEMA = 4;
STATUS_INTERNAL_ERROR = 5;
}
14 changes: 12 additions & 2 deletions api/proto/banyandb/stream/v1/write.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package banyandb.stream.v1;

import "banyandb/common/v1/common.proto";
import "banyandb/model/v1/common.proto";
import "banyandb/model/v1/write.proto";
import "google/protobuf/timestamp.proto";
import "validate/validate.proto";

Expand All @@ -39,13 +40,22 @@ message ElementValue {
}

message WriteRequest {
// the metadata is only required in the first write.
// the metadata is required.
common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
// the element is required.
ElementValue element = 2 [(validate.rules).message.required = true];
// the message_id is required.
uint64 message_id = 3 [(validate.rules).uint64.gt = 0];
}

message WriteResponse {}
message WriteResponse {
// the message_id from request.
uint64 message_id = 1 [(validate.rules).uint64.gt = 0];
// status indicates the request processing result
model.v1.Status status = 2 [(validate.rules).enum.defined_only = true];
// the metadata from request when request fails
common.v1.Metadata metadata = 3 [(validate.rules).message.required = true];
}

message InternalWriteRequest {
uint32 shard_id = 1;
Expand Down
15 changes: 9 additions & 6 deletions banyand/liaison/grpc/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,17 @@ type entityRepo struct {
func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
var el partition.EntityLocator
var id identity
var modRevision int64
switch schemaMetadata.Kind {
case schema.KindMeasure:
measure := schemaMetadata.Spec.(*databasev1.Measure)
el = partition.NewEntityLocator(measure.TagFamilies, measure.Entity)
modRevision = measure.GetMetadata().GetModRevision()
el = partition.NewEntityLocator(measure.TagFamilies, measure.Entity, modRevision)
id = getID(measure.GetMetadata())
case schema.KindStream:
stream := schemaMetadata.Spec.(*databasev1.Stream)
el = partition.NewEntityLocator(stream.TagFamilies, stream.Entity)
modRevision = stream.GetMetadata().GetModRevision()
el = partition.NewEntityLocator(stream.TagFamilies, stream.Entity, modRevision)
id = getID(stream.GetMetadata())
default:
return
Expand All @@ -259,16 +262,16 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
Str("kind", kind).
Msg("entity added or updated")
}
en := make(partition.EntityLocator, 0, len(el))
for _, l := range el {
en := make([]partition.TagLocator, 0, len(el.TagLocators))
for _, l := range el.TagLocators {
en = append(en, partition.TagLocator{
FamilyOffset: l.FamilyOffset,
TagOffset: l.TagOffset,
})
}
e.RWMutex.Lock()
defer e.RWMutex.Unlock()
e.entitiesMap[id] = en
e.entitiesMap[id] = partition.EntityLocator{TagLocators: en, ModRevision: modRevision}
}

// OnDelete implements schema.EventHandler.
Expand Down Expand Up @@ -310,7 +313,7 @@ func (e *entityRepo) getLocator(id identity) (partition.EntityLocator, bool) {
defer e.RWMutex.RUnlock()
el, ok := e.entitiesMap[id]
if !ok {
return nil, false
return partition.EntityLocator{}, false
}
return el, true
}
26 changes: 19 additions & 7 deletions banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
Expand Down Expand Up @@ -56,8 +58,8 @@ func (ms *measureService) activeIngestionAccessLog(root string) (err error) {
}

func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
reply := func(measure measurev1.MeasureService_WriteServer, logger *logger.Logger) {
if errResp := measure.Send(&measurev1.WriteResponse{}); errResp != nil {
reply := func(metadata *commonv1.Metadata, status modelv1.Status, messageId uint64, measure measurev1.MeasureService_WriteServer, logger *logger.Logger) {
if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Err(errResp).Msg("failed to send response")
}
}
Expand All @@ -76,18 +78,28 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
}
if err != nil {
ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to receive message")
reply(measure, ms.sampled)
continue
return err
}
if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
ms.sampled.Error().Err(errTime).Stringer("written", writeRequest).Msg("the data point time is invalid")
reply(measure, ms.sampled)
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, ms.sampled)
continue
}
measureCache, existed := ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
if !existed {
ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to measure schema not found")
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, ms.sampled)
continue
}
if writeRequest.Metadata.ModRevision != measureCache.ModRevision {
ms.sampled.Error().Stringer("written", writeRequest).Msg("the measure schema is expired")
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, ms.sampled)
continue
}
entity, tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
ms.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
reply(measure, ms.sampled)
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled)
continue
}
if ms.ingestionAccessLog != nil {
Expand All @@ -107,7 +119,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
if errWritePub != nil {
ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to send a message")
}
reply(measure, ms.sampled)
reply(nil, modelv1.Status_STATUS_SUCCEED, writeRequest.GetMessageId(), measure, ms.sampled)
}
}

Expand Down
28 changes: 20 additions & 8 deletions banyand/liaison/grpc/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,25 @@ type streamRegistryServer struct {
func (rs *streamRegistryServer) Create(ctx context.Context,
req *databasev1.StreamRegistryServiceCreateRequest,
) (*databasev1.StreamRegistryServiceCreateResponse, error) {
if err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream()); err != nil {
modRevision, err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream())
if err != nil {
return nil, err
}
return &databasev1.StreamRegistryServiceCreateResponse{}, nil
return &databasev1.StreamRegistryServiceCreateResponse{
ModRevision: modRevision,
}, nil
}

func (rs *streamRegistryServer) Update(ctx context.Context,
req *databasev1.StreamRegistryServiceUpdateRequest,
) (*databasev1.StreamRegistryServiceUpdateResponse, error) {
if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil {
modRevision, err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream())
if err != nil {
return nil, err
}
return &databasev1.StreamRegistryServiceUpdateResponse{}, nil
return &databasev1.StreamRegistryServiceUpdateResponse{
ModRevision: modRevision,
}, nil
}

func (rs *streamRegistryServer) Delete(ctx context.Context,
Expand Down Expand Up @@ -287,19 +293,25 @@ type measureRegistryServer struct {
func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) (
*databasev1.MeasureRegistryServiceCreateResponse, error,
) {
if err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure()); err != nil {
modRevision, err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure())
if err != nil {
return nil, err
}
return &databasev1.MeasureRegistryServiceCreateResponse{}, nil
return &databasev1.MeasureRegistryServiceCreateResponse{
ModRevision: modRevision,
}, nil
}

func (rs *measureRegistryServer) Update(ctx context.Context, req *databasev1.MeasureRegistryServiceUpdateRequest) (
*databasev1.MeasureRegistryServiceUpdateResponse, error,
) {
if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil {
modRevision, err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure())
if err != nil {
return nil, err
}
return &databasev1.MeasureRegistryServiceUpdateResponse{}, nil
return &databasev1.MeasureRegistryServiceUpdateResponse{
ModRevision: modRevision,
}, nil
}

func (rs *measureRegistryServer) Delete(ctx context.Context, req *databasev1.MeasureRegistryServiceDeleteRequest) (
Expand Down
26 changes: 19 additions & 7 deletions banyand/liaison/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
Expand Down Expand Up @@ -56,8 +58,8 @@ func (s *streamService) activeIngestionAccessLog(root string) (err error) {
}

func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
reply := func(stream streamv1.StreamService_WriteServer, logger *logger.Logger) {
if errResp := stream.Send(&streamv1.WriteResponse{}); errResp != nil {
reply := func(metadata *commonv1.Metadata, status modelv1.Status, messageId uint64, stream streamv1.StreamService_WriteServer, logger *logger.Logger) {
if errResp := stream.Send(&streamv1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Err(errResp).Msg("failed to send response")
}
}
Expand All @@ -76,18 +78,28 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
}
if err != nil {
s.sampled.Error().Stringer("written", writeEntity).Err(err).Msg("failed to receive message")
reply(stream, s.sampled)
continue
return err
}
if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
s.sampled.Error().Stringer("written", writeEntity).Err(errTime).Msg("the element time is invalid")
reply(stream, s.sampled)
reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
if !existed {
s.sampled.Error().Err(err).Stringer("written", writeEntity).Msg("failed to stream schema not found")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
s.sampled.Error().Stringer("written", writeEntity).Msg("the stream schema is expired")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
entity, tagValues, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
s.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
reply(stream, s.sampled)
reply(nil, modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
if s.ingestionAccessLog != nil {
Expand All @@ -108,7 +120,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
if errWritePub != nil {
s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to send a message")
}
reply(stream, s.sampled)
reply(nil, modelv1.Status_STATUS_SUCCEED, writeEntity.GetMessageId(), stream, s.sampled)
}
}

Expand Down
3 changes: 2 additions & 1 deletion banyand/measure/measure_topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
iwr := &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: t.topNSchema.GetMetadata(),
MessageId: uint64(time.Now().UnixNano()),
Metadata: t.topNSchema.GetMetadata(),
DataPoint: &measurev1.DataPointValue{
Timestamp: timestamppb.New(eventTime),
TagFamilies: []*modelv1.TagFamilyForWrite{
Expand Down
1 change: 1 addition & 0 deletions banyand/measure/measure_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
Request: &measurev1.WriteRequest{
Metadata: s.GetSchema().Metadata,
DataPoint: value,
MessageId: uint64(time.Now().UnixNano()),
},
EntityValues: entityValues[1:],
})
Expand Down
Loading

0 comments on commit 9552688

Please sign in to comment.