From 156eabdbac0fc6cb133d521442be5b6609f2d0c8 Mon Sep 17 00:00:00 2001 From: Ben Eddy Date: Tue, 27 Aug 2024 09:53:04 -0700 Subject: [PATCH 1/7] Add query attempt interceptor --- cluster.go | 4 ++ doc.go | 12 +++++ example_interceptor_test.go | 92 +++++++++++++++++++++++++++++++++++++ query_executor.go | 41 ++++++++++++++--- session.go | 18 +++++--- 5 files changed, 155 insertions(+), 12 deletions(-) create mode 100644 example_interceptor_test.go diff --git a/cluster.go b/cluster.go index 13e62f3b0..1fd81fd65 100644 --- a/cluster.go +++ b/cluster.go @@ -214,6 +214,10 @@ type ClusterConfig struct { // See https://issues.apache.org/jira/browse/CASSANDRA-10786 DisableSkipMetadata bool + // QueryAttemptInterceptor will set the provided query interceptor on all queries created from this session. + // Use it to intercept and modify queries by providing an implementation of QueryAttemptInterceptor. + QueryAttemptInterceptor QueryAttemptInterceptor + // QueryObserver will set the provided query observer on all queries created from this session. // Use it to collect metrics / stats from queries by providing an implementation of QueryObserver. QueryObserver QueryObserver diff --git a/doc.go b/doc.go index f23e812c5..8b08aa4d6 100644 --- a/doc.go +++ b/doc.go @@ -362,6 +362,18 @@ // // See Example_userDefinedTypesMap, Example_userDefinedTypesStruct, ExampleUDTMarshaler, ExampleUDTUnmarshaler. // +// # Interceptors +// +// A QueryAttemptInterceptor wraps query execution and can be used to inject logic that should apply to all query +// and batch execution attempts. For example, interceptors can be used for rate limiting, logging, attaching +// distributed tracing metadata to the context, modifying queries, and inspecting query results. +// +// A QueryAttemptInterceptor will be invoked once prior to each query execution attempt, including retry attempts +// and speculative execution attempts. Interceptors are responsible for calling the provided handler and returning +// a non-nil Iter. +// +// See Example_interceptor for full example. +// // # Metrics and tracing // // It is possible to provide observer implementations that could be used to gather metrics: diff --git a/example_interceptor_test.go b/example_interceptor_test.go new file mode 100644 index 000000000..3b36a813d --- /dev/null +++ b/example_interceptor_test.go @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 + * Copyright (c) 2016, The Gocql authors, + * provided under the BSD-3-Clause License. + * See the NOTICE file distributed with this work for additional information. + */ + +package gocql_test + +import ( + "context" + "fmt" + "log" + "time" + + gocql "github.com/gocql/gocql" +) + +type MyQueryAttemptInterceptor struct { + injectFault bool +} + +func (q MyQueryAttemptInterceptor) Intercept( + ctx context.Context, + query gocql.ExecutableQuery, + conn *gocql.Conn, + handler gocql.QueryAttemptHandler, +) *gocql.Iter { + switch q := query.(type) { + case *gocql.Query: + // Inspect or modify query + query = q + case *gocql.Batch: + // Inspect or modify batch + query = q + } + + // Inspect or modify context + ctx = context.WithValue(ctx, "trace-id", "123") + + // Optionally bypass the handler and return an error to prevent query execution. + // For example, to simulate query timeouts. + if q.injectFault && query.Attempts() == 0 { + <-time.After(1 * time.Second) + return gocql.NewIterWithErr(gocql.RequestErrWriteTimeout{}) + } + + // The interceptor *must* invoke the handler to execute the query. + return handler(ctx, query, conn) +} + +// Example_interceptor demonstrates how to implement a QueryAttemptInterceptor. +func Example_interceptor() { + cluster := gocql.NewCluster("localhost:9042") + cluster.QueryAttemptInterceptor = MyQueryAttemptInterceptor{injectFault: true} + + session, err := cluster.CreateSession() + if err != nil { + log.Fatal(err) + } + defer session.Close() + + ctx := context.Background() + + var stringValue string + err = session.Query("select now() from system.local"). + WithContext(ctx). + RetryPolicy(&gocql.SimpleRetryPolicy{NumRetries: 2}). + Scan(&stringValue) + if err != nil { + log.Fatalf("query failed %T", err) + } + fmt.Println(stringValue) + // Output: MOOOO! +} diff --git a/query_executor.go b/query_executor.go index fb68b07f2..caa1618f3 100644 --- a/query_executor.go +++ b/query_executor.go @@ -34,7 +34,7 @@ type ExecutableQuery interface { borrowForExecution() // Used to ensure that the query stays alive for lifetime of a particular execution goroutine. releaseAfterExecution() // Used when a goroutine finishes its execution attempts, either with ok result or an error. execute(ctx context.Context, conn *Conn) *Iter - attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) + attempt(ctx context.Context, keyspace string, end, start time.Time, iter *Iter, host *HostInfo) retryPolicy() RetryPolicy speculativeExecutionPolicy() SpeculativeExecutionPolicy GetRoutingKey() ([]byte, error) @@ -48,16 +48,45 @@ type ExecutableQuery interface { } type queryExecutor struct { - pool *policyConnPool - policy HostSelectionPolicy + pool *policyConnPool + policy HostSelectionPolicy + interceptor QueryAttemptInterceptor +} + +// QueryAttemptHandler is a function that attempts query execution. +type QueryAttemptHandler = func(context.Context, ExecutableQuery, *Conn) *Iter + +// QueryAttemptInterceptor is the interface implemented by query interceptors / middleware. +// +// Interceptors are well-suited to logic that is not specific to a single query or batch. +type QueryAttemptInterceptor interface { + // Intercept is invoked once immediately before a query execution attempt, including retry attempts and + // speculative execution attempts. + + // The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to + // call the handler will panic. If the interceptor wants to halt query execution and prevent retries, it should + // return an error. + Intercept(ctx context.Context, query ExecutableQuery, conn *Conn, handler QueryAttemptHandler) *Iter } func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter { start := time.Now() - iter := qry.execute(ctx, conn) - end := time.Now() - qry.attempt(q.pool.keyspace, end, start, iter, conn.host) + var iter *Iter + if q.interceptor != nil { + // Propagate interceptor context modifications. + _ctx := ctx + iter = q.interceptor.Intercept(_ctx, qry, conn, func(_ctx context.Context, qry ExecutableQuery, c *Conn) *Iter { + ctx = _ctx + return qry.execute(ctx, conn) + }) + } else { + iter = qry.execute(ctx, conn) + return iter + } + + end := time.Now() + qry.attempt(ctx, q.pool.keyspace, end, start, iter, conn.host) return iter } diff --git a/session.go b/session.go index a600b95f3..5d2f35f4e 100644 --- a/session.go +++ b/session.go @@ -178,8 +178,9 @@ func NewSession(cfg ClusterConfig) (*Session, error) { s.policy.Init(s) s.executor = &queryExecutor{ - pool: s.pool, - policy: cfg.PoolConfig.HostSelectionPolicy, + pool: s.pool, + policy: cfg.PoolConfig.HostSelectionPolicy, + interceptor: cfg.QueryAttemptInterceptor, } s.queryObserver = cfg.QueryObserver @@ -1111,12 +1112,12 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter { return conn.executeQuery(ctx, q) } -func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { +func (q *Query) attempt(ctx context.Context, keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { latency := end.Sub(start) attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.observer != nil) if q.observer != nil { - q.observer.ObserveQuery(q.Context(), ObservedQuery{ + q.observer.ObserveQuery(ctx, ObservedQuery{ Keyspace: keyspace, Statement: q.stmt, Values: q.values, @@ -1448,6 +1449,11 @@ func (iter *Iter) Columns() []ColumnInfo { return iter.meta.columns } +// NewIterWithErr return a new *Iter with an error. +func NewIterWithErr(err error) *Iter { + return &Iter{err: err} +} + type Scanner interface { // Next advances the row pointer to point at the next row, the row is valid until // the next call of Next. It returns true if there is a row which is available to be @@ -1942,7 +1948,7 @@ func (b *Batch) WithTimestamp(timestamp int64) *Batch { return b } -func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { +func (b *Batch) attempt(ctx context.Context, keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { latency := end.Sub(start) attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.observer != nil) @@ -1958,7 +1964,7 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host values[i] = entry.Args } - b.observer.ObserveBatch(b.Context(), ObservedBatch{ + b.observer.ObserveBatch(ctx, ObservedBatch{ Keyspace: keyspace, Statements: statements, Values: values, From cada3e5eaa0ef555336786a2974a304786d73e69 Mon Sep 17 00:00:00 2001 From: Ben Eddy Date: Thu, 12 Sep 2024 16:42:14 -0700 Subject: [PATCH 2/7] Remove early return Removes return statement that bypassed query attempt tracking. --- query_executor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/query_executor.go b/query_executor.go index caa1618f3..19c75f994 100644 --- a/query_executor.go +++ b/query_executor.go @@ -82,7 +82,6 @@ func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, c }) } else { iter = qry.execute(ctx, conn) - return iter } end := time.Now() From 09a4f9a1cd8f46922f5e2cefc218261d2882ca37 Mon Sep 17 00:00:00 2001 From: Ben Eddy Date: Fri, 13 Sep 2024 11:16:04 -0700 Subject: [PATCH 3/7] Pass QueryAttempt type to interceptor --- example_interceptor_test.go | 16 ++++++---------- query_executor.go | 26 +++++++++++++++++++++----- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/example_interceptor_test.go b/example_interceptor_test.go index 3b36a813d..bf6883b31 100644 --- a/example_interceptor_test.go +++ b/example_interceptor_test.go @@ -26,7 +26,6 @@ package gocql_test import ( "context" - "fmt" "log" "time" @@ -39,17 +38,16 @@ type MyQueryAttemptInterceptor struct { func (q MyQueryAttemptInterceptor) Intercept( ctx context.Context, - query gocql.ExecutableQuery, - conn *gocql.Conn, + attempt gocql.QueryAttempt, handler gocql.QueryAttemptHandler, ) *gocql.Iter { - switch q := query.(type) { + switch q := attempt.Query.(type) { case *gocql.Query: // Inspect or modify query - query = q + attempt.Query = q case *gocql.Batch: // Inspect or modify batch - query = q + attempt.Query = q } // Inspect or modify context @@ -57,13 +55,13 @@ func (q MyQueryAttemptInterceptor) Intercept( // Optionally bypass the handler and return an error to prevent query execution. // For example, to simulate query timeouts. - if q.injectFault && query.Attempts() == 0 { + if q.injectFault && attempt.Number == 1 { <-time.After(1 * time.Second) return gocql.NewIterWithErr(gocql.RequestErrWriteTimeout{}) } // The interceptor *must* invoke the handler to execute the query. - return handler(ctx, query, conn) + return handler(ctx, attempt) } // Example_interceptor demonstrates how to implement a QueryAttemptInterceptor. @@ -87,6 +85,4 @@ func Example_interceptor() { if err != nil { log.Fatalf("query failed %T", err) } - fmt.Println(stringValue) - // Output: MOOOO! } diff --git a/query_executor.go b/query_executor.go index 19c75f994..ad4f8b76a 100644 --- a/query_executor.go +++ b/query_executor.go @@ -53,8 +53,19 @@ type queryExecutor struct { interceptor QueryAttemptInterceptor } +type QueryAttempt struct { + // The query to execute, either a *gocql.Query or *gocql.Batch. + Query ExecutableQuery + // The connection used to execute the query. + Conn *Conn + // The host that will receive the query. + Host *HostInfo + // The number of previous query attempts. 0 for the initial attempt, 1 for the first retry, etc. + Attempts int +} + // QueryAttemptHandler is a function that attempts query execution. -type QueryAttemptHandler = func(context.Context, ExecutableQuery, *Conn) *Iter +type QueryAttemptHandler = func(context.Context, QueryAttempt) *Iter // QueryAttemptInterceptor is the interface implemented by query interceptors / middleware. // @@ -66,19 +77,24 @@ type QueryAttemptInterceptor interface { // The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to // call the handler will panic. If the interceptor wants to halt query execution and prevent retries, it should // return an error. - Intercept(ctx context.Context, query ExecutableQuery, conn *Conn, handler QueryAttemptHandler) *Iter + Intercept(ctx context.Context, attempt QueryAttempt, handler QueryAttemptHandler) *Iter } func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter { start := time.Now() - var iter *Iter if q.interceptor != nil { // Propagate interceptor context modifications. _ctx := ctx - iter = q.interceptor.Intercept(_ctx, qry, conn, func(_ctx context.Context, qry ExecutableQuery, c *Conn) *Iter { + attempt := QueryAttempt{ + Query: qry, + Conn: conn, + Host: conn.host, + Attempts: qry.Attempts(), + } + iter = q.interceptor.Intercept(_ctx, attempt, func(_ctx context.Context, attempt QueryAttempt) *Iter { ctx = _ctx - return qry.execute(ctx, conn) + return attempt.Query.execute(ctx, attempt.Conn) }) } else { iter = qry.execute(ctx, conn) From 23c8336a0e9c19e10e47057c4176d3700165aa17 Mon Sep 17 00:00:00 2001 From: Ben Eddy Date: Fri, 13 Sep 2024 11:32:32 -0700 Subject: [PATCH 4/7] Fix interceptor example --- example_interceptor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example_interceptor_test.go b/example_interceptor_test.go index bf6883b31..a7f3e56c9 100644 --- a/example_interceptor_test.go +++ b/example_interceptor_test.go @@ -55,7 +55,7 @@ func (q MyQueryAttemptInterceptor) Intercept( // Optionally bypass the handler and return an error to prevent query execution. // For example, to simulate query timeouts. - if q.injectFault && attempt.Number == 1 { + if q.injectFault && attempt.Attempts == 0 { <-time.After(1 * time.Second) return gocql.NewIterWithErr(gocql.RequestErrWriteTimeout{}) } From 835b0361140c98945a8d53f594cb6de1987ba279 Mon Sep 17 00:00:00 2001 From: Ben Eddy Date: Tue, 17 Sep 2024 14:47:07 -0700 Subject: [PATCH 5/7] Allow interceptor to return error Remove gocql.NewIterWithErr --- doc.go | 2 +- example_interceptor_test.go | 6 +++--- query_executor.go | 14 +++++++++----- session.go | 5 ----- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/doc.go b/doc.go index 8b08aa4d6..94ad85211 100644 --- a/doc.go +++ b/doc.go @@ -370,7 +370,7 @@ // // A QueryAttemptInterceptor will be invoked once prior to each query execution attempt, including retry attempts // and speculative execution attempts. Interceptors are responsible for calling the provided handler and returning -// a non-nil Iter. +// a non-nil Iter or an error. // // See Example_interceptor for full example. // diff --git a/example_interceptor_test.go b/example_interceptor_test.go index a7f3e56c9..67c1c974b 100644 --- a/example_interceptor_test.go +++ b/example_interceptor_test.go @@ -40,7 +40,7 @@ func (q MyQueryAttemptInterceptor) Intercept( ctx context.Context, attempt gocql.QueryAttempt, handler gocql.QueryAttemptHandler, -) *gocql.Iter { +) (*gocql.Iter, error) { switch q := attempt.Query.(type) { case *gocql.Query: // Inspect or modify query @@ -57,11 +57,11 @@ func (q MyQueryAttemptInterceptor) Intercept( // For example, to simulate query timeouts. if q.injectFault && attempt.Attempts == 0 { <-time.After(1 * time.Second) - return gocql.NewIterWithErr(gocql.RequestErrWriteTimeout{}) + return nil, gocql.RequestErrWriteTimeout{} } // The interceptor *must* invoke the handler to execute the query. - return handler(ctx, attempt) + return handler(ctx, attempt), nil } // Example_interceptor demonstrates how to implement a QueryAttemptInterceptor. diff --git a/query_executor.go b/query_executor.go index ad4f8b76a..f5955c9fe 100644 --- a/query_executor.go +++ b/query_executor.go @@ -74,15 +74,16 @@ type QueryAttemptInterceptor interface { // Intercept is invoked once immediately before a query execution attempt, including retry attempts and // speculative execution attempts. - // The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to - // call the handler will panic. If the interceptor wants to halt query execution and prevent retries, it should - // return an error. - Intercept(ctx context.Context, attempt QueryAttempt, handler QueryAttemptHandler) *Iter + // The interceptor is responsible for calling the `handler` function and returning the handler result. If the + // interceptor wants to bypass the handler and skip query execution, it should return an error. Failure to + // return either the handler result or an error will panic. + Intercept(ctx context.Context, attempt QueryAttempt, handler QueryAttemptHandler) (*Iter, error) } func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter { start := time.Now() var iter *Iter + var err error if q.interceptor != nil { // Propagate interceptor context modifications. _ctx := ctx @@ -92,10 +93,13 @@ func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, c Host: conn.host, Attempts: qry.Attempts(), } - iter = q.interceptor.Intercept(_ctx, attempt, func(_ctx context.Context, attempt QueryAttempt) *Iter { + iter, err = q.interceptor.Intercept(_ctx, attempt, func(_ctx context.Context, attempt QueryAttempt) *Iter { ctx = _ctx return attempt.Query.execute(ctx, attempt.Conn) }) + if err != nil { + iter = &Iter{err: err} + } } else { iter = qry.execute(ctx, conn) } diff --git a/session.go b/session.go index 5d2f35f4e..8fe777bac 100644 --- a/session.go +++ b/session.go @@ -1449,11 +1449,6 @@ func (iter *Iter) Columns() []ColumnInfo { return iter.meta.columns } -// NewIterWithErr return a new *Iter with an error. -func NewIterWithErr(err error) *Iter { - return &Iter{err: err} -} - type Scanner interface { // Next advances the row pointer to point at the next row, the row is valid until // the next call of Next. It returns true if there is a row which is available to be From ca8e0a43c9a41c70f8174a88992e63fdea8c73db Mon Sep 17 00:00:00 2001 From: Ben Eddy Date: Mon, 2 Dec 2024 14:17:33 -0800 Subject: [PATCH 6/7] Update QueryAttemptHandler signature To facilitate chaining interceptors --- example_interceptor_test.go | 53 ++++++++++++++++++++++++++++++++++++- query_executor.go | 7 ++--- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/example_interceptor_test.go b/example_interceptor_test.go index 67c1c974b..097fa4eb2 100644 --- a/example_interceptor_test.go +++ b/example_interceptor_test.go @@ -61,7 +61,7 @@ func (q MyQueryAttemptInterceptor) Intercept( } // The interceptor *must* invoke the handler to execute the query. - return handler(ctx, attempt), nil + return handler(ctx, attempt) } // Example_interceptor demonstrates how to implement a QueryAttemptInterceptor. @@ -86,3 +86,54 @@ func Example_interceptor() { log.Fatalf("query failed %T", err) } } + +type QueryAttemptInterceptorChain struct { + interceptors []gocql.QueryAttemptInterceptor +} + +func (c QueryAttemptInterceptorChain) Intercept( + ctx context.Context, + attempt gocql.QueryAttempt, + handler gocql.QueryAttemptHandler, +) (*gocql.Iter, error) { + return c.interceptors[0].Intercept(ctx, attempt, c.getNextHandler(0, handler)) +} + +func (c QueryAttemptInterceptorChain) getNextHandler(curr int, final gocql.QueryAttemptHandler) gocql.QueryAttemptHandler { + if curr == len(c.interceptors)-1 { + return final + } + + return func(ctx context.Context, attempt gocql.QueryAttempt) (*gocql.Iter, error) { + return c.interceptors[curr+1].Intercept(ctx, attempt, c.getNextHandler(curr+1, final)) + } +} + +// Example_interceptor_chain demonstrates how to chain QueryAttemptInterceptors. +func Example_interceptor_chain() { + cluster := gocql.NewCluster("localhost:9042") + cluster.QueryAttemptInterceptor = QueryAttemptInterceptorChain{ + []gocql.QueryAttemptInterceptor{ + MyQueryAttemptInterceptor{}, + MyQueryAttemptInterceptor{}, + MyQueryAttemptInterceptor{}, + }, + } + + session, err := cluster.CreateSession() + if err != nil { + log.Fatal(err) + } + defer session.Close() + + ctx := context.Background() + + var stringValue string + err = session.Query("select now() from system.local"). + WithContext(ctx). + RetryPolicy(&gocql.SimpleRetryPolicy{NumRetries: 2}). + Scan(&stringValue) + if err != nil { + log.Fatalf("query failed %T", err) + } +} diff --git a/query_executor.go b/query_executor.go index f5955c9fe..ac168f882 100644 --- a/query_executor.go +++ b/query_executor.go @@ -65,7 +65,7 @@ type QueryAttempt struct { } // QueryAttemptHandler is a function that attempts query execution. -type QueryAttemptHandler = func(context.Context, QueryAttempt) *Iter +type QueryAttemptHandler = func(context.Context, QueryAttempt) (*Iter, error) // QueryAttemptInterceptor is the interface implemented by query interceptors / middleware. // @@ -93,9 +93,10 @@ func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, c Host: conn.host, Attempts: qry.Attempts(), } - iter, err = q.interceptor.Intercept(_ctx, attempt, func(_ctx context.Context, attempt QueryAttempt) *Iter { + iter, err = q.interceptor.Intercept(_ctx, attempt, func(_ctx context.Context, attempt QueryAttempt) (*Iter, error) { ctx = _ctx - return attempt.Query.execute(ctx, attempt.Conn) + iter := attempt.Query.execute(ctx, attempt.Conn) + return iter, iter.err }) if err != nil { iter = &Iter{err: err} From 96de34e620601174b1b72c005680004ec8cfd0ef Mon Sep 17 00:00:00 2001 From: Ben Eddy Date: Mon, 2 Dec 2024 14:23:29 -0800 Subject: [PATCH 7/7] Remove gocql.Conn from gocql.QueryAttempt Replace with read-only addr fields. --- query_executor.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/query_executor.go b/query_executor.go index ac168f882..03f0c3aa0 100644 --- a/query_executor.go +++ b/query_executor.go @@ -26,6 +26,7 @@ package gocql import ( "context" + "net" "sync" "time" ) @@ -56,10 +57,12 @@ type queryExecutor struct { type QueryAttempt struct { // The query to execute, either a *gocql.Query or *gocql.Batch. Query ExecutableQuery - // The connection used to execute the query. - Conn *Conn // The host that will receive the query. Host *HostInfo + // The local address of the connection used to execute the query. + LocalAddr net.Addr + // The remote address of the connection used to execute the query. + RemoteAddr net.Addr // The number of previous query attempts. 0 for the initial attempt, 1 for the first retry, etc. Attempts int } @@ -88,14 +91,15 @@ func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, c // Propagate interceptor context modifications. _ctx := ctx attempt := QueryAttempt{ - Query: qry, - Conn: conn, - Host: conn.host, - Attempts: qry.Attempts(), + Query: qry, + Host: conn.host, + LocalAddr: conn.conn.LocalAddr(), + RemoteAddr: conn.conn.RemoteAddr(), + Attempts: qry.Attempts(), } iter, err = q.interceptor.Intercept(_ctx, attempt, func(_ctx context.Context, attempt QueryAttempt) (*Iter, error) { ctx = _ctx - iter := attempt.Query.execute(ctx, attempt.Conn) + iter := attempt.Query.execute(ctx, conn) return iter, iter.err }) if err != nil {