Skip to content

Commit

Permalink
Merge branch 'main' into dev_measure_aggregate_function
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Sep 3, 2024
2 parents f6b6fe6 + 2bad32a commit 61eb634
Show file tree
Hide file tree
Showing 76 changed files with 1,250 additions and 582 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/e2e.storage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ jobs:
run: make generate
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Build binary
run: make release
- name: Build docker image
run: |
make docker.build || make docker.build
make -C test/docker build | make -C test/docker build
docker image ls
- name: ${{ matrix.test.name }}
uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
Expand Down
10 changes: 7 additions & 3 deletions .github/workflows/publish-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,17 @@ jobs:
run: make generate
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Build Linux binaries
run: |
TARGET_OS=linux PLATFORMS=linux/amd64,linux/arm64 make release
- name: Build Windows binaries
run: |
GOOS=windows GOARCH=amd64 make -C banyand banyand-server-static
GOOS=windows GOARCH=amd64 make -C bydbctl build
TARGET_OS=windows PLATFORMS=windows/amd64 make release
- name: Build docker image
if: github.ref != 'refs/heads/main' # Only build docker image on PR(Push image when pushed to main branch)
run: |
make docker.build || make docker.build
make -C test/docker build | make -C test/docker build
docker image ls
- name: Log in to the Container registry
uses: docker/login-action@v1.10.0
Expand All @@ -94,4 +97,5 @@ jobs:
- name: Push docker image
if: github.ref == 'refs/heads/main'
run: |
make docker.push || make docker.push
PLATFROMS=linux/amd64,linux/arm64,windows/amd64 make docker.push || make docker.push
make -C test/docker push | make -C test/docker push
11 changes: 10 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Release Notes.
- Introduce the round-robin selector to Liaison Node.
- Optimize query performance of series index.
- Add liaison, remote queue, storage(rotation), time-series tables, metadata cache and scheduler metrics.
- Add HTTP health check endpoint for the data node.
- Add slow query log for the distributed query and local query.

### Bugs

Expand All @@ -49,10 +51,17 @@ Release Notes.
- Add web-ui interacting guide.
- Add bydbctl interacting guide.
- Add cluster management guide.
- Add operation related documents: configuration, troubleshooting, system, upgrade, and observability.

### Chores

Bump up the version of infra e2e framework.
- Bump up the version of infra e2e framework.
- Separate the monolithic release package into two packages: banyand and bydbctl.
- Separate the monolithic Docker image into two images: banyand and bydbctl.
- Update CI to publish linux/amd64 and linux/arm64 Docker images.
- Make the build system compiles the binary based on the platform which is running on.
- Push "skywalking-banyandb-test" image for e2e and stress test. This image contains bydbctl to do a health check.
- Set etcd-client log level to "error" and etcd-server log level to "error".

## 0.6.1

Expand Down
19 changes: 6 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,12 @@ license-dep: default ## Fix license header issues
##@ Docker targets

docker.build: TARGET=docker
docker.build: DIR=docker
docker.build:
$(MAKE) $(TARGET) -C $(DIR); \
if [ $$? -ne 0 ]; then \
exit 1; \
fi; \
docker.build: PROJECTS:= banyand bydbctl
docker.build: default ## Build docker images

docker.push: TARGET=docker.push
docker.push: DIR=docker
docker.push:
$(MAKE) $(TARGET) -C $(DIR); \
if [ $$? -ne 0 ]; then \
exit 1; \
fi; \
docker.push: PROJECTS:= banyand bydbctl
docker.push: default ## Push docker images

default:
@for PRJ in $(PROJECTS); do \
Expand All @@ -189,7 +181,8 @@ release-source: clean ## Package source archive
${RELEASE_SCRIPTS} -s

release-sign: ## Sign artifacts
${RELEASE_SCRIPTS} -k bin
${RELEASE_SCRIPTS} -k banyand
${RELEASE_SCRIPTS} -k bydbctl
${RELEASE_SCRIPTS} -k src

release-assembly: release-binary release-sign ## Generate release package
Expand Down
46 changes: 10 additions & 36 deletions docker/Dockerfile → banyand/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.22 AS dev
WORKDIR /app
ENV GOOS="linux"
ENV CGO_ENABLED=0

RUN go install github.com/cosmtrek/air@latest \
&& go install github.com/go-delve/delve/cmd/dlv@latest

EXPOSE 8080
EXPOSE 2345

ENTRYPOINT ["air"]

FROM golang:1.22 AS base

ENV GOPATH "/go"
ENV GO111MODULE "on"
WORKDIR /src
COPY go.* ./
RUN go mod download

FROM base AS builder

RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
BUILD_DIR=/out make -C banyand banyand-server-static
RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
BUILD_DIR=/out make -C bydbctl build

FROM alpine:edge AS certs
RUN apk add --no-cache ca-certificates && update-ca-certificates

FROM busybox:stable-glibc as build-linux
FROM busybox:stable-glibc AS build-linux

COPY --from=builder /out/banyand-server-static /banyand
ARG TARGETARCH

COPY build/bin/linux/${TARGETARCH}/banyand-server-static /banyand
COPY --from=certs /etc/ssl/certs /etc/ssl/certs
COPY --from=builder /out/bydbctl /bydbctl

FROM mcr.microsoft.com/windows/servercore:ltsc2022 as build-windows
FROM mcr.microsoft.com/windows/servercore:ltsc2022 AS build-windows

ARG TARGETARCH

COPY banyand/build/bin/banyand-server-static "/banyand"
COPY bydbctl/build/bin/bydbctl "/bydbctl"
COPY build/bin/windows/${TARGETARCH}/banyand-server-static "/banyand"

FROM build-${TARGETOS} AS final

EXPOSE 17912
EXPOSE 17913
EXPOSE 6060
EXPOSE 2121

ENTRYPOINT ["/banyand"]
ENTRYPOINT ["/banyand"]
9 changes: 8 additions & 1 deletion banyand/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@
NAME := banyand
SERVER := $(NAME)-server
BINARIES := $(SERVER)
DEBUG_BINARIES := $(SERVER)-debug

IMG_NAME := skywalking-banyandb

include ../scripts/build/version.mk
include ../scripts/build/base.mk
include ../scripts/build/generate_go.mk
include ../scripts/build/build.mk
include ../scripts/build/docker.mk
include ../scripts/build/test.mk
include ../scripts/build/lint.mk
include ../scripts/build/vendor.mk
include ../scripts/build/help.mk

prepare-build: generate

docker.dev:
@echo "Building $(IMG) with platform $(PLATFORMS)"
@pwd
time docker buildx build $(DOCKER_BUILD_ARGS) --platform $(PLATFORMS) --load --no-cache -t $(IMG) -f Dockerfile.dev --provenance=false .
12 changes: 12 additions & 0 deletions banyand/dquery/dquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dquery
import (
"context"
"errors"
"time"

"go.uber.org/multierr"

Expand Down Expand Up @@ -60,6 +61,7 @@ type queryService struct {
tqp *topNQueryProcessor
closer *run.Closer
nodeID string
slowQuery time.Duration
}

// NewService return a new query service.
Expand Down Expand Up @@ -90,6 +92,16 @@ func (q *queryService) Name() string {
return moduleName
}

func (q *queryService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("distributed-query")
fs.DurationVar(&q.slowQuery, "dst-slow-query", 0, "distributed slow query threshold, 0 means no slow query log")
return fs
}

func (q *queryService) Validate() error {
return nil
}

func (q *queryService) PreRun(ctx context.Context) error {
val := ctx.Value(common.ContextNodeKey)
if val == nil {
Expand Down
8 changes: 7 additions & 1 deletion banyand/dquery/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
timeRange: queryCriteria.TimeRange,
}))
if err != nil {
ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan")
ml.Error().Err(err).Dur("latency", time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to query")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to execute the query plan for measure %s: %v", meta.GetName(), err))
return
}
Expand Down Expand Up @@ -144,5 +144,11 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
}
resp = bus.NewMessage(bus.MessageID(now), qr)
if !queryCriteria.Trace && p.slowQuery > 0 {
latency := time.Since(n)
if latency > p.slowQuery {
p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(result)).Msg("measure slow query")
}
}
return
}
14 changes: 9 additions & 5 deletions banyand/dquery/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
Expand Down Expand Up @@ -80,9 +79,9 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
var tracer *query.Tracer
var span *query.Span
tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, ctx = tracer.StartSpan(ctx, "distributed-%s", p.queryService.nodeID)
span.Tag("plan", plan.String())
Expand All @@ -93,7 +92,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
d.Trace = tracer.ToProto()
case common.Error:
span.Error(errors.New(d.Msg()))
resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{Trace: tracer.ToProto()})
resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
Expand All @@ -113,6 +112,11 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
}

resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Elements: entities})

if !queryCriteria.Trace && p.slowQuery > 0 {
latency := time.Since(n)
if latency > p.slowQuery {
p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(entities)).Msg("stream slow query")
}
}
return
}
34 changes: 32 additions & 2 deletions banyand/dquery/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package dquery

import (
"context"
"errors"
"time"

"go.uber.org/multierr"
Expand All @@ -30,7 +32,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
pkgquery "github.com/apache/skywalking-banyandb/pkg/query"
)

const defaultTopNQueryTimeout = 10 * time.Second
Expand All @@ -46,6 +50,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
t.log.Warn().Msg("invalid event data type")
return
}
n := time.Now()
now := bus.MessageID(request.TimeRange.Begin.Nanos)
if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
resp = bus.NewMessage(now, common.NewError("unspecified requested sort direction"))
Expand All @@ -56,7 +61,25 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
return
}
if e := t.log.Debug(); e.Enabled() {
e.Stringer("req", request).Msg("received a topN query event")
e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event")
}
if request.Trace {
tracer, ctx := pkgquery.NewTracer(context.TODO(), n.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "distributed-client")
span.Tag("request", convert.BytesToString(logger.Proto(request)))
defer func() {
data := resp.Data()
switch d := data.(type) {
case *measurev1.TopNResponse:
d.Trace = tracer.ToProto()
case common.Error:
span.Error(errors.New(d.Msg()))
resp = bus.NewMessage(now, &measurev1.TopNResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
span.Stop()
}()
}
agg := request.Agg
request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
Expand Down Expand Up @@ -103,9 +126,16 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
resp = bus.NewMessage(now, &measurev1.TopNResponse{})
return
}
lists := aggregator.Val(tags)
resp = bus.NewMessage(now, &measurev1.TopNResponse{
Lists: aggregator.Val(tags),
Lists: lists,
})
if !request.Trace && t.slowQuery > 0 {
latency := time.Since(n)
if latency > t.slowQuery {
t.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(request)).Int("resp_count", len(lists)).Msg("top_n slow query")
}
}
return
}

Expand Down
Loading

0 comments on commit 61eb634

Please sign in to comment.