Skip to content

Commit

Permalink
[query] Adding complete tags to remote server with integration test (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored May 21, 2019
1 parent e1a3252 commit 6bfedb7
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 102 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ docker-integration-test:
@./scripts/docker-integration-tests/prometheus/test.sh
@./scripts/docker-integration-tests/carbon/test.sh
@./scripts/docker-integration-tests/aggregator/test.sh
@./scripts/docker-integration-tests/query_fanout/test.sh

.PHONY: site-build
site-build:
Expand Down
65 changes: 64 additions & 1 deletion scripts/docker-integration-tests/query_fanout/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ curl -vvvsS -X POST 0.0.0.0:19003/writetagged -d '{
}
}'


function read {
RESPONSE=$(curl "http://0.0.0.0:7201/api/v1/query?query=test_metric")
ACTUAL=$(echo $RESPONSE | jq .data.result[].metric.cluster)
Expand All @@ -90,3 +89,67 @@ function read_sum {
}

ATTEMPTS=5 TIMEOUT=1 retry_with_backoff read_sum

echo "Write local tagged data to cluster a"
curl -vvvsS -X POST 0.0.0.0:19003/writetagged -d '{
"namespace": "unagg",
"id": "{__name__=\"test_metric\",cluster=\"cluster-b\",endpoint=\"/request\",local-only=\"local\"}",
"tags": [
{
"name": "__name__",
"value": "test_metric"
},
{
"name": "cluster",
"value": "cluster-b"
},
{
"name": "endpoint",
"value": "/request"
},
{
"name": "local-only",
"value": "local"
}
],
"datapoint": {
"timestamp":'"$(date +"%s")"',
"value": 42.123456789
}
}'

echo "Write remote tagged data to cluster b"
curl -vvvsS -X POST 0.0.0.0:19003/writetagged -d '{
"namespace": "unagg",
"id": "{__name__=\"test_metric\",cluster=\"cluster-b\",endpoint=\"/request\",remote-only=\"remote\"}",
"tags": [
{
"name": "__name__",
"value": "test_metric"
},
{
"name": "cluster",
"value": "cluster-b"
},
{
"name": "endpoint",
"value": "/request"
},
{
"name": "remote-only",
"value": "remote"
}
],
"datapoint": {
"timestamp":'"$(date +"%s")"',
"value": 42.123456789
}
}'

function complete_tags {
RESPONSE=$(curl "http://0.0.0.0:7201/api/v1/labels")
ACTUAL=$(echo $RESPONSE | jq .data[])
test "$(echo $ACTUAL)" = '"__name__" "cluster" "endpoint" "local-only" "remote-only"'
}

ATTEMPTS=5 TIMEOUT=1 retry_with_backoff complete_tags
210 changes: 140 additions & 70 deletions src/query/generated/proto/rpcpb/query.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/query/generated/proto/rpcpb/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ enum CompleteTagsType {
message CompleteTagsRequestOptions {
CompleteTagsType type = 1;
repeated bytes filterNameTags = 2;
int64 start = 3;
int64 end = 4;
}

message CompleteTagsRequest {
Expand Down
20 changes: 17 additions & 3 deletions src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ type RunOptions struct {
// InterruptCh is a programmatic interrupt channel to supply to
// interrupt and shutdown the server.
InterruptCh <-chan error

// ListenerCh is a programmatic channel to receive the server listener
// on once it has opened.
ListenerCh chan<- net.Listener
}

// Run runs the server programmatically given a filename for the configuration file.
Expand Down Expand Up @@ -280,11 +284,21 @@ func Run(runOpts RunOptions) {
}
}()

listener, err := net.Listen("tcp", listenAddress)
if err != nil {
logger.Fatal("unable to listen on listen address",
zap.String("address", listenAddress),
zap.Error(err))
}
if runOpts.ListenerCh != nil {
runOpts.ListenerCh <- listener
}
go func() {
logger.Info("starting API server", zap.String("address", listenAddress))
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Fatal("server error while listening",
zap.String("address", listenAddress), zap.Error(err))
if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed {
logger.Fatal("server serve error",
zap.String("address", listenAddress),
zap.Error(err))
}
}()

Expand Down
55 changes: 30 additions & 25 deletions src/query/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,17 @@ import (
"google.golang.org/grpc"
)

var queryPort = 25123

var configYAML = `
listenAddress:
type: "config"
value: "127.0.0.1:25123"
value: "127.0.0.1:0"
metrics:
scope:
prefix: "coordinator"
prometheus:
handlerPath: /metrics
listenAddress: "127.0.0.1:18202"
listenAddress: "127.0.0.1:0"
sanitization: prometheus
samplingRate: 1.0
Expand Down Expand Up @@ -88,7 +86,6 @@ writeWorkerPoolPolicy:
`

//TODO: Use randomly assigned port here
func TestRun(t *testing.T) {
ctrl := gomock.NewController(xtest.Reporter{T: t})
defer ctrl.Finish()
Expand Down Expand Up @@ -136,24 +133,30 @@ func TestRun(t *testing.T) {
return dbClient, nil
})

interruptCh := make(chan error)
doneCh := make(chan struct{})
interruptCh := make(chan error, 1)
doneCh := make(chan struct{}, 1)
listenerCh := make(chan net.Listener, 1)
go func() {
Run(RunOptions{
Config: cfg,
InterruptCh: interruptCh,
ListenerCh: listenerCh,
})
doneCh <- struct{}{}
}()

// Wait for listener
listener := <-listenerCh
addr := listener.Addr().String()

// Wait for server to come up
waitForServerHealthy(t, queryPort)
waitForServerHealthy(t, addr)

// Send Prometheus write request
promReq := test.GeneratePromWriteRequest()
promReqBody := test.GeneratePromWriteRequestBody(t, promReq)
req, err := http.NewRequest(http.MethodPost,
fmt.Sprintf("http://127.0.0.1:%d", queryPort)+remote.PromWriteURL, promReqBody)
fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL), promReqBody)
require.NoError(t, err)

res, err := http.DefaultClient.Do(req)
Expand All @@ -179,11 +182,11 @@ func newTestFile(t *testing.T, fileName, contents string) (*os.File, closeFn) {
}
}

func waitForServerHealthy(t *testing.T, port int) {
func waitForServerHealthy(t *testing.T, addr string) {
maxWait := 10 * time.Second
startAt := time.Now()
for time.Since(startAt) < maxWait {
req, err := http.NewRequest("GET", fmt.Sprintf("http://127.0.0.1:%d/health", port), nil)
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/health", addr), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
if err != nil || res.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -232,24 +235,26 @@ func (s *queryServer) CompleteTags(
}

func TestGRPCBackend(t *testing.T) {
var grpcConfigYAML = `
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
grpcAddr := lis.Addr().String()
var grpcConfigYAML = fmt.Sprintf(`
listenAddress:
type: "config"
value: "127.0.0.1:17221"
value: "127.0.0.1:0"
metrics:
scope:
prefix: "coordinator"
prometheus:
handlerPath: /metrics
listenAddress: "127.0.0.1:17223"
listenAddress: "127.0.0.1:0"
onError: stderr
sanitization: prometheus
samplingRate: 1.0
rpc:
remoteListenAddresses:
- "127.0.0.1:17222"
remoteListenAddresses: ["%s"]
backend: grpc
Expand All @@ -268,17 +273,11 @@ writeWorkerPoolPolicy:
size: 100
shards: 1000
killProbability: 0.3
`

// TODO(arnikola): REVERT
t.SkipNow()
`, grpcAddr)

ctrl := gomock.NewController(xtest.Reporter{T: t})
defer ctrl.Finish()

port := "127.0.0.1:17222"
lis, err := net.Listen("tcp", port)
require.NoError(t, err)
s := grpc.NewServer()
defer s.GracefulStop()
qs := &queryServer{}
Expand All @@ -300,22 +299,28 @@ writeWorkerPoolPolicy:

interruptCh := make(chan error)
doneCh := make(chan struct{})
listenerCh := make(chan net.Listener, 1)
go func() {
Run(RunOptions{
Config: cfg,
InterruptCh: interruptCh,
ListenerCh: listenerCh,
})
doneCh <- struct{}{}
}()

// Wait for listener
listener := <-listenerCh
addr := listener.Addr().String()

// Wait for server to come up
waitForServerHealthy(t, 17221)
waitForServerHealthy(t, addr)

// Send Prometheus read request
promReq := test.GeneratePromReadRequest()
promReqBody := test.GeneratePromReadRequestBody(t, promReq)
req, err := http.NewRequest(http.MethodPost,
"http://127.0.0.1:17221"+remote.PromReadURL, promReqBody)
fmt.Sprintf("http://%s%s", addr, remote.PromReadURL), promReqBody)
require.NoError(t, err)

_, err = http.DefaultClient.Do(req)
Expand Down
7 changes: 6 additions & 1 deletion src/query/storage/fanout/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,13 @@ func (s *fanoutStorage) CompleteTags(
query *storage.CompleteTagsQuery,
options *storage.FetchOptions,
) (*storage.CompleteTagsResult, error) {
accumulatedTags := storage.NewCompleteTagsResultBuilder(query.CompleteNameOnly)
stores := filterCompleteTagsStores(s.stores, s.completeTagsFilter, *query)
// short circuit complete tags
if len(stores) == 1 {
return stores[0].CompleteTags(ctx, query, options)
}

accumulatedTags := storage.NewCompleteTagsResultBuilder(query.CompleteNameOnly)
for _, store := range stores {
result, err := store.CompleteTags(ctx, query, options)
if err != nil {
Expand Down
74 changes: 74 additions & 0 deletions src/query/tsdb/remote/codecs_complete_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,80 @@ func encodeCompleteTagsRequest(
Options: &rpc.CompleteTagsRequestOptions{
Type: completionType,
FilterNameTags: query.FilterNameTags,
Start: fromTime(query.Start),
End: fromTime(query.End),
},
}, nil
}

func decodeCompleteTagsRequest(
request *rpc.CompleteTagsRequest,
) (*storage.CompleteTagsQuery, error) {
var (
opts = request.GetOptions()
matchers = request.GetTagMatchers()
)

completeNameOnly := opts.GetType() == rpc.CompleteTagsType_TAGNAME
tagMatchers, err := decodeTagMatchers(matchers)
if err != nil {
return nil, err
}

return &storage.CompleteTagsQuery{
CompleteNameOnly: completeNameOnly,
FilterNameTags: opts.GetFilterNameTags(),
TagMatchers: tagMatchers,
Start: toTime(opts.GetStart()),
End: toTime(opts.GetEnd()),
}, nil
}

func encodeToCompressedCompleteTagsDefaultResult(
results *storage.CompleteTagsResult,
) (*rpc.CompleteTagsResponse, error) {
tags := results.CompletedTags
values := make([]*rpc.TagValue, 0, len(tags))
for _, tag := range tags {
values = append(values, &rpc.TagValue{
Key: tag.Name,
Values: tag.Values,
})
}

return &rpc.CompleteTagsResponse{
Value: &rpc.CompleteTagsResponse_Default{
Default: &rpc.TagValues{
Values: values,
},
},
}, nil
}

func encodeToCompressedCompleteTagsNameOnlyResult(
results *storage.CompleteTagsResult,
) (*rpc.CompleteTagsResponse, error) {
tags := results.CompletedTags
names := make([][]byte, 0, len(tags))
for _, tag := range tags {
names = append(names, tag.Name)
}

return &rpc.CompleteTagsResponse{
Value: &rpc.CompleteTagsResponse_NamesOnly{
NamesOnly: &rpc.TagNames{
Names: names,
},
},
}, nil
}

func encodeToCompressedCompleteTagsResult(
results *storage.CompleteTagsResult,
) (*rpc.CompleteTagsResponse, error) {
if results.CompleteNameOnly {
return encodeToCompressedCompleteTagsNameOnlyResult(results)
}

return encodeToCompressedCompleteTagsDefaultResult(results)
}
Loading

0 comments on commit 6bfedb7

Please sign in to comment.