-
Notifications
You must be signed in to change notification settings - Fork 622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CASSGO-39 Add query attempt interceptor #1820
base: trunk
Are you sure you want to change the base?
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
/* | ||
* Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 | ||
* Copyright (c) 2016, The Gocql authors, | ||
* provided under the BSD-3-Clause License. | ||
* See the NOTICE file distributed with this work for additional information. | ||
*/ | ||
|
||
package gocql_test | ||
|
||
import ( | ||
"context" | ||
"log" | ||
"time" | ||
|
||
gocql "github.com/gocql/gocql" | ||
) | ||
|
||
type MyQueryAttemptInterceptor struct { | ||
injectFault bool | ||
} | ||
|
||
func (q MyQueryAttemptInterceptor) Intercept( | ||
ctx context.Context, | ||
attempt gocql.QueryAttempt, | ||
handler gocql.QueryAttemptHandler, | ||
) *gocql.Iter { | ||
switch q := attempt.Query.(type) { | ||
case *gocql.Query: | ||
// Inspect or modify query | ||
attempt.Query = q | ||
case *gocql.Batch: | ||
// Inspect or modify batch | ||
attempt.Query = q | ||
} | ||
|
||
// Inspect or modify context | ||
ctx = context.WithValue(ctx, "trace-id", "123") | ||
|
||
// Optionally bypass the handler and return an error to prevent query execution. | ||
// For example, to simulate query timeouts. | ||
if q.injectFault && attempt.Attempts == 0 { | ||
<-time.After(1 * time.Second) | ||
return gocql.NewIterWithErr(gocql.RequestErrWriteTimeout{}) | ||
} | ||
|
||
// The interceptor *must* invoke the handler to execute the query. | ||
return handler(ctx, attempt) | ||
} | ||
|
||
// Example_interceptor demonstrates how to implement a QueryAttemptInterceptor. | ||
func Example_interceptor() { | ||
cluster := gocql.NewCluster("localhost:9042") | ||
cluster.QueryAttemptInterceptor = MyQueryAttemptInterceptor{injectFault: true} | ||
|
||
session, err := cluster.CreateSession() | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
defer session.Close() | ||
|
||
ctx := context.Background() | ||
|
||
var stringValue string | ||
err = session.Query("select now() from system.local"). | ||
WithContext(ctx). | ||
RetryPolicy(&gocql.SimpleRetryPolicy{NumRetries: 2}). | ||
Scan(&stringValue) | ||
if err != nil { | ||
log.Fatalf("query failed %T", err) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ type ExecutableQuery interface { | |
borrowForExecution() // Used to ensure that the query stays alive for lifetime of a particular execution goroutine. | ||
releaseAfterExecution() // Used when a goroutine finishes its execution attempts, either with ok result or an error. | ||
execute(ctx context.Context, conn *Conn) *Iter | ||
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) | ||
attempt(ctx context.Context, keyspace string, end, start time.Time, iter *Iter, host *HostInfo) | ||
retryPolicy() RetryPolicy | ||
speculativeExecutionPolicy() SpeculativeExecutionPolicy | ||
GetRoutingKey() ([]byte, error) | ||
|
@@ -48,16 +48,60 @@ type ExecutableQuery interface { | |
} | ||
|
||
type queryExecutor struct { | ||
pool *policyConnPool | ||
policy HostSelectionPolicy | ||
pool *policyConnPool | ||
policy HostSelectionPolicy | ||
interceptor QueryAttemptInterceptor | ||
} | ||
|
||
type QueryAttempt struct { | ||
// The query to execute, either a *gocql.Query or *gocql.Batch. | ||
Query ExecutableQuery | ||
// The connection used to execute the query. | ||
Conn *Conn | ||
// The host that will receive the query. | ||
Host *HostInfo | ||
Comment on lines
+59
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to add and expose these fields in If there’s no compelling need for direct connection manipulation, then maybe we could expose only a connection metadata that’s relevant for logging or monitoring purposes? |
||
// The number of previous query attempts. 0 for the initial attempt, 1 for the first retry, etc. | ||
Attempts int | ||
} | ||
|
||
// QueryAttemptHandler is a function that attempts query execution. | ||
type QueryAttemptHandler = func(context.Context, QueryAttempt) *Iter | ||
|
||
// QueryAttemptInterceptor is the interface implemented by query interceptors / middleware. | ||
// | ||
// Interceptors are well-suited to logic that is not specific to a single query or batch. | ||
type QueryAttemptInterceptor interface { | ||
// Intercept is invoked once immediately before a query execution attempt, including retry attempts and | ||
// speculative execution attempts. | ||
|
||
// The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to | ||
// call the handler will panic. If the interceptor wants to halt query execution and prevent retries, it should | ||
// return an error. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: The function does not have an question: How does returning an error prevent a retry? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Updated
Returning a non-retriable error like |
||
Intercept(ctx context.Context, attempt QueryAttempt, handler QueryAttemptHandler) *Iter | ||
} | ||
|
||
func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter { | ||
start := time.Now() | ||
iter := qry.execute(ctx, conn) | ||
end := time.Now() | ||
var iter *Iter | ||
if q.interceptor != nil { | ||
// Propagate interceptor context modifications. | ||
_ctx := ctx | ||
attempt := QueryAttempt{ | ||
Query: qry, | ||
Conn: conn, | ||
Host: conn.host, | ||
Attempts: qry.Attempts(), | ||
} | ||
iter = q.interceptor.Intercept(_ctx, attempt, func(_ctx context.Context, attempt QueryAttempt) *Iter { | ||
ctx = _ctx | ||
return attempt.Query.execute(ctx, attempt.Conn) | ||
}) | ||
} else { | ||
iter = qry.execute(ctx, conn) | ||
} | ||
|
||
qry.attempt(q.pool.keyspace, end, start, iter, conn.host) | ||
end := time.Now() | ||
qry.attempt(ctx, q.pool.keyspace, end, start, iter, conn.host) | ||
|
||
return iter | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding an option to apply the interceptor selectively to specific queries, like
.WithInterceptor()
method to the Query struct?