Skip to content

Commit 291756b

Browse files
committed
CASSGO-92: add public method to retrieve StatementMetadata and LogField methods
StatementMetadata can be called on a session to get the bind, result, and pk information for a given query. Previously this wasn't publicly exposed but is necessary for some implementations of HostSelectionPolicy like token-aware. This might also be useful for CI tooling or runtime analysis of queries and the types of columns. NewLogField* are methods to to return a LogField with name and a specific type. Finally, session init was cleaned up to prevent a HostSelectionPolicy from causing a panic if it tried to make a query during init. The interface was documented that queries should not be attempted. Patch by James Hartig for CASSGO-92; reviewed by João Reis for CASSGO-92
1 parent f3e2b39 commit 291756b

File tree

15 files changed

+342
-316
lines changed

15 files changed

+342
-316
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [2.1.0]
9+
10+
### Added
11+
12+
- Session.StatementMetadata (CASSGO-92)
13+
- NewLogFieldIP, NewLogFieldError, NewLogFieldStringer, NewLogFieldString, NewLogFieldInt, NewLogFieldBool (CASSGO-92)
14+
15+
### Fixed
16+
17+
- Prevent panic with queries during session init (CASSGO-92)
18+
819
## [2.0.0]
920

1021
### Removed

cassandra_test.go

Lines changed: 72 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -2794,66 +2794,62 @@ func TestKeyspaceMetadata(t *testing.T) {
27942794
}
27952795

27962796
// Integration test of the routing key calculation
2797-
func TestRoutingKey(t *testing.T) {
2797+
func TestRoutingStatementMetadata(t *testing.T) {
27982798
session := createSession(t)
27992799
defer session.Close()
28002800

2801-
if err := createTable(session, "CREATE TABLE gocql_test.test_single_routing_key (first_id int, second_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
2801+
if err := createTable(session, "CREATE TABLE gocql_test.test_single_routing_key (first_id int, second_id varchar, PRIMARY KEY (first_id, second_id))"); err != nil {
28022802
t.Fatalf("failed to create table with error '%v'", err)
28032803
}
2804-
if err := createTable(session, "CREATE TABLE gocql_test.test_composite_routing_key (first_id int, second_id int, PRIMARY KEY ((first_id, second_id)))"); err != nil {
2804+
if err := createTable(session, "CREATE TABLE gocql_test.test_composite_routing_key (first_id int, second_id varchar, PRIMARY KEY ((first_id, second_id)))"); err != nil {
28052805
t.Fatalf("failed to create table with error '%v'", err)
28062806
}
28072807

2808-
routingKeyInfo, err := session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "")
2808+
meta, err := session.routingStatementMetadata(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "")
28092809
if err != nil {
2810-
t.Fatalf("failed to get routing key info due to error: %v", err)
2810+
t.Fatalf("failed to get routing statement metadata due to error: %v", err)
28112811
}
2812-
if routingKeyInfo == nil {
2813-
t.Fatal("Expected routing key info, but was nil")
2812+
if meta == nil {
2813+
t.Fatal("Expected routing statement metadata, but was nil")
28142814
}
2815-
if len(routingKeyInfo.indexes) != 1 {
2816-
t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
2815+
if len(meta.PKBindColumnIndexes) != 1 {
2816+
t.Fatalf("Expected routing statement metadata PKBindColumnIndexes length to be 1 but was %d", len(meta.PKBindColumnIndexes))
28172817
}
2818-
if routingKeyInfo.indexes[0] != 1 {
2819-
t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
2818+
if meta.PKBindColumnIndexes[0] != 1 {
2819+
t.Errorf("Expected routing statement metadata PKBindColumnIndexes[0] to be 1 but was %d", meta.PKBindColumnIndexes[0])
28202820
}
2821-
if len(routingKeyInfo.types) != 1 {
2822-
t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
2821+
if len(meta.BindColumns) != 2 {
2822+
t.Fatalf("Expected routing statement metadata BindColumns length to be 2 but was %d", len(meta.BindColumns))
28232823
}
2824-
if routingKeyInfo.types[0] == nil {
2825-
t.Fatal("Expected routing key types[0] to be non-nil")
2824+
if meta.BindColumns[0].TypeInfo.Type() != TypeVarchar {
2825+
t.Fatalf("Expected routing statement metadata BindColumns[0].TypeInfo.Type to be %v but was %v", TypeVarchar, meta.BindColumns[0].TypeInfo.Type())
28262826
}
2827-
if routingKeyInfo.types[0].Type() != TypeInt {
2828-
t.Fatalf("Expected routing key types[0].Type to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
2827+
if meta.BindColumns[1].TypeInfo.Type() != TypeInt {
2828+
t.Fatalf("Expected routing statement metadata BindColumns[1].TypeInfo.Type to be %v but was %v", TypeInt, meta.BindColumns[1].TypeInfo.Type())
28292829
}
2830-
2831-
// verify the cache is working
2832-
routingKeyInfo, err = session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "")
2833-
if err != nil {
2834-
t.Fatalf("failed to get routing key info due to error: %v", err)
2830+
if len(meta.ResultColumns) != 2 {
2831+
t.Fatalf("Expected routing statement metadata ResultColumns length to be 2 but was %d", len(meta.ResultColumns))
28352832
}
2836-
if len(routingKeyInfo.indexes) != 1 {
2837-
t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
2833+
if meta.ResultColumns[0].Name != "first_id" {
2834+
t.Fatalf("Expected routing statement metadata ResultColumns[0].Name to be %v but was %v", "first_id", meta.ResultColumns[0].Name)
28382835
}
2839-
if routingKeyInfo.indexes[0] != 1 {
2840-
t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
2836+
if meta.ResultColumns[0].TypeInfo.Type() != TypeInt {
2837+
t.Fatalf("Expected routing statement metadata ResultColumns[0].TypeInfo.Type to be %v but was %v", TypeInt, meta.ResultColumns[0].TypeInfo.Type())
28412838
}
2842-
if len(routingKeyInfo.types) != 1 {
2843-
t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
2839+
if meta.ResultColumns[1].Name != "second_id" {
2840+
t.Fatalf("Expected routing statement metadata ResultColumns[1].Name to be %v but was %v", "second_id", meta.ResultColumns[1].Name)
28442841
}
2845-
if routingKeyInfo.types[0] == nil {
2846-
t.Fatal("Expected routing key types[0] to be non-nil")
2842+
if meta.ResultColumns[1].TypeInfo.Type() != TypeVarchar {
2843+
t.Fatalf("Expected routing statement metadata ResultColumns[1].TypeInfo.Type to be %v but was %v", TypeVarchar, meta.ResultColumns[1].TypeInfo.Type())
28472844
}
2848-
if routingKeyInfo.types[0].Type() != TypeInt {
2849-
t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
2850-
}
2851-
cacheSize := session.routingKeyInfoCache.lru.Len()
2845+
2846+
// verify the cache is working
2847+
cacheSize := session.routingMetadataCache.lru.Len()
28522848
if cacheSize != 1 {
28532849
t.Errorf("Expected cache size to be 1 but was %d", cacheSize)
28542850
}
28552851

2856-
query := newInternalQuery(session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2), nil)
2852+
query := newInternalQuery(session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "1", 2), nil)
28572853
routingKey, err := query.GetRoutingKey()
28582854
if err != nil {
28592855
t.Fatalf("Failed to get routing key due to error: %v", err)
@@ -2863,50 +2859,59 @@ func TestRoutingKey(t *testing.T) {
28632859
t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
28642860
}
28652861

2866-
routingKeyInfo, err = session.routingKeyInfo(context.Background(), "SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", "")
2862+
meta, err = session.routingStatementMetadata(context.Background(), "SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", "")
28672863
if err != nil {
2868-
t.Fatalf("failed to get routing key info due to error: %v", err)
2864+
t.Fatalf("failed to get routing statement metadata due to error: %v", err)
2865+
}
2866+
if meta == nil {
2867+
t.Fatal("Expected routing statement metadata, but was nil")
2868+
}
2869+
if len(meta.PKBindColumnIndexes) != 2 {
2870+
t.Fatalf("Expected routing statement metadata PKBindColumnIndexes length to be 2 but was %d", len(meta.PKBindColumnIndexes))
2871+
}
2872+
if meta.PKBindColumnIndexes[0] != 1 {
2873+
t.Errorf("Expected routing statement metadata PKBindColumnIndexes[0] to be 1 but was %d", meta.PKBindColumnIndexes[0])
28692874
}
2870-
if routingKeyInfo == nil {
2871-
t.Fatal("Expected routing key info, but was nil")
2875+
if meta.PKBindColumnIndexes[1] != 0 {
2876+
t.Errorf("Expected routing statement metadata PKBindColumnIndexes[1] to be 0 but was %d", meta.PKBindColumnIndexes[1])
28722877
}
2873-
if len(routingKeyInfo.indexes) != 2 {
2874-
t.Fatalf("Expected routing key indexes length to be 2 but was %d", len(routingKeyInfo.indexes))
2878+
if len(meta.BindColumns) != 2 {
2879+
t.Fatalf("Expected routing statement metadata BindColumns length to be 2 but was %d", len(meta.BindColumns))
28752880
}
2876-
if routingKeyInfo.indexes[0] != 1 {
2877-
t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
2881+
if meta.BindColumns[0].TypeInfo.Type() != TypeVarchar {
2882+
t.Fatalf("Expected routing statement metadata BindColumns[0].TypeInfo.Type to be %v but was %v", TypeVarchar, meta.BindColumns[0].TypeInfo.Type())
28782883
}
2879-
if routingKeyInfo.indexes[1] != 0 {
2880-
t.Errorf("Expected routing key index[1] to be 0 but was %d", routingKeyInfo.indexes[1])
2884+
if meta.BindColumns[1].TypeInfo.Type() != TypeInt {
2885+
t.Fatalf("Expected routing statement metadata BindColumns[1].TypeInfo.Type to be %v but was %v", TypeInt, meta.BindColumns[1].TypeInfo.Type())
28812886
}
2882-
if len(routingKeyInfo.types) != 2 {
2883-
t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
2887+
if len(meta.ResultColumns) != 2 {
2888+
t.Fatalf("Expected routing statement metadata ResultColumns length to be 2 but was %d", len(meta.ResultColumns))
28842889
}
2885-
if routingKeyInfo.types[0] == nil {
2886-
t.Fatal("Expected routing key types[0] to be non-nil")
2890+
if meta.ResultColumns[0].Name != "first_id" {
2891+
t.Fatalf("Expected routing statement metadata ResultColumns[0].Name to be %v but was %v", "first_id", meta.ResultColumns[0].Name)
28872892
}
2888-
if routingKeyInfo.types[0].Type() != TypeInt {
2889-
t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
2893+
if meta.ResultColumns[0].TypeInfo.Type() != TypeInt {
2894+
t.Fatalf("Expected routing statement metadata ResultColumns[0].TypeInfo.Type to be %v but was %v", TypeInt, meta.ResultColumns[0].TypeInfo.Type())
28902895
}
2891-
if routingKeyInfo.types[1] == nil {
2892-
t.Fatal("Expected routing key types[1] to be non-nil")
2896+
if meta.ResultColumns[1].Name != "second_id" {
2897+
t.Fatalf("Expected routing statement metadata ResultColumns[1].Name to be %v but was %v", "second_id", meta.ResultColumns[1].Name)
28932898
}
2894-
if routingKeyInfo.types[1].Type() != TypeInt {
2895-
t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[1].Type())
2899+
if meta.ResultColumns[1].TypeInfo.Type() != TypeVarchar {
2900+
t.Fatalf("Expected routing statement metadata ResultColumns[1].TypeInfo.Type to be %v but was %v", TypeVarchar, meta.ResultColumns[1].TypeInfo.Type())
28962901
}
28972902

2898-
query = newInternalQuery(session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2), nil)
2903+
query = newInternalQuery(session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", "1", 2), nil)
28992904
routingKey, err = query.GetRoutingKey()
29002905
if err != nil {
29012906
t.Fatalf("Failed to get routing key due to error: %v", err)
29022907
}
2903-
expectedRoutingKey = []byte{0, 4, 0, 0, 0, 2, 0, 0, 4, 0, 0, 0, 1, 0}
2908+
expectedRoutingKey = []byte{0, 4, 0, 0, 0, 2, 0, 0, 1, 49, 0}
29042909
if !reflect.DeepEqual(expectedRoutingKey, routingKey) {
29052910
t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
29062911
}
29072912

29082913
// verify the cache is working
2909-
cacheSize = session.routingKeyInfoCache.lru.Len()
2914+
cacheSize = session.routingMetadataCache.lru.Len()
29102915
if cacheSize != 2 {
29112916
t.Errorf("Expected cache size to be 2 but was %d", cacheSize)
29122917
}
@@ -3956,17 +3961,17 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
39563961
t.Fatal(err)
39573962
}
39583963

3959-
getRoutingKeyInfo := func(key string) *routingKeyInfo {
3964+
getStatementMetadata := func(key string) *StatementMetadata {
39603965
t.Helper()
3961-
session.routingKeyInfoCache.mu.Lock()
3962-
value, ok := session.routingKeyInfoCache.lru.Get(key)
3966+
session.routingMetadataCache.mu.Lock()
3967+
value, ok := session.routingMetadataCache.lru.Get(key)
39633968
if !ok {
39643969
t.Fatalf("routing key not found in cache for key %v", key)
39653970
}
3966-
session.routingKeyInfoCache.mu.Unlock()
3971+
session.routingMetadataCache.mu.Unlock()
39673972

39683973
inflight := value.(*inflightCachedEntry)
3969-
return inflight.value.(*routingKeyInfo)
3974+
return inflight.value.(*StatementMetadata)
39703975
}
39713976

39723977
const insertQuery = "INSERT INTO routing_key_cache_uses_overridden_ks (id) VALUES (?)"
@@ -3979,8 +3984,8 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
39793984
require.NoError(t, err)
39803985

39813986
// Ensuring that the cache contains the query with default ks
3982-
routingKeyInfo1 := getRoutingKeyInfo("gocql_test" + b1.Entries[0].Stmt)
3983-
require.Equal(t, "gocql_test", routingKeyInfo1.keyspace)
3987+
meta1 := getStatementMetadata("gocql_test" + b1.Entries[0].Stmt)
3988+
require.Equal(t, "gocql_test", meta1.Keyspace)
39843989

39853990
// Running batch in gocql_test_routing_key_cache ks
39863991
b2 := session.Batch(LoggedBatch)
@@ -3991,8 +3996,8 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
39913996
require.NoError(t, err)
39923997

39933998
// Ensuring that the cache contains the query with gocql_test_routing_key_cache ks
3994-
routingKeyInfo2 := getRoutingKeyInfo("gocql_test_routing_key_cache" + b2.Entries[0].Stmt)
3995-
require.Equal(t, "gocql_test_routing_key_cache", routingKeyInfo2.keyspace)
3999+
meta2 := getStatementMetadata("gocql_test_routing_key_cache" + b2.Entries[0].Stmt)
4000+
require.Equal(t, "gocql_test_routing_key_cache", meta2.Keyspace)
39964001

39974002
const selectStmt = "SELECT * FROM routing_key_cache_uses_overridden_ks WHERE id=?"
39984003

cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,8 @@ func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int, logger Str
352352
}
353353
newAddr, newPort := cfg.AddressTranslator.Translate(addr, port)
354354
logger.Debug("Translating address.",
355-
newLogFieldIp("old_addr", addr), newLogFieldInt("old_port", port),
356-
newLogFieldIp("new_addr", newAddr), newLogFieldInt("new_port", newPort))
355+
NewLogFieldIP("old_addr", addr), NewLogFieldInt("old_port", port),
356+
NewLogFieldIP("new_addr", newAddr), NewLogFieldInt("new_port", newPort))
357357
return newAddr, newPort
358358
}
359359

conn.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
709709
delete(c.calls, head.stream)
710710
c.mu.Unlock()
711711
if call == nil || !ok {
712-
c.logger.Warning("Received response for stream which has no handler.", newLogFieldString("header", head.String()))
712+
c.logger.Warning("Received response for stream which has no handler.", NewLogFieldString("header", head.String()))
713713
return c.discardFrame(r, head)
714714
} else if head.stream != call.streamID {
715715
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
@@ -1316,7 +1316,7 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
13161316
responseFrame, err := resp.framer.parseFrame()
13171317
if err != nil {
13181318
c.logger.Warning("Framer error while attempting to parse potential protocol error.",
1319-
newLogFieldError("err", err))
1319+
NewLogFieldError("err", err))
13201320
return nil, errProtocol
13211321
}
13221322
//goland:noinspection GoTypeAssertionOnErrors
@@ -1333,17 +1333,17 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
13331333
case <-timeoutCh:
13341334
close(call.timeout)
13351335
c.logger.Debug("Request timed out on connection.",
1336-
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
1336+
NewLogFieldString("host_id", c.host.HostID()), NewLogFieldIP("addr", c.host.ConnectAddress()))
13371337
return nil, ErrTimeoutNoResponse
13381338
case <-ctxDone:
13391339
c.logger.Debug("Request failed because context elapsed out on connection.",
1340-
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()),
1341-
newLogFieldError("ctx_err", ctx.Err()))
1340+
NewLogFieldString("host_id", c.host.HostID()), NewLogFieldIP("addr", c.host.ConnectAddress()),
1341+
NewLogFieldError("ctx_err", ctx.Err()))
13421342
close(call.timeout)
13431343
return nil, ctx.Err()
13441344
case <-c.ctx.Done():
13451345
c.logger.Debug("Request failed because connection closed.",
1346-
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
1346+
NewLogFieldString("host_id", c.host.HostID()), NewLogFieldIP("addr", c.host.ConnectAddress()))
13471347
close(call.timeout)
13481348
return nil, ErrConnectionClosed
13491349
}
@@ -1698,7 +1698,7 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter {
16981698
iter.framer = framer
16991699
if err := c.awaitSchemaAgreement(ctx); err != nil {
17001700
// TODO: should have this behind a flag
1701-
c.logger.Warning("Error while awaiting for schema agreement after a schema change event.", newLogFieldError("err", err))
1701+
c.logger.Warning("Error while awaiting for schema agreement after a schema change event.", NewLogFieldError("err", err))
17021702
}
17031703
// dont return an error from this, might be a good idea to give a warning
17041704
// though. The impact of this returning an error would be that the cluster
@@ -1956,7 +1956,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
19561956
goto cont
19571957
}
19581958
if !isValidPeer(host) || host.schemaVersion == "" {
1959-
c.logger.Warning("Invalid peer or peer with empty schema_version.", newLogFieldIp("peer", host.ConnectAddress()))
1959+
c.logger.Warning("Invalid peer or peer with empty schema_version.", NewLogFieldIP("peer", host.ConnectAddress()))
19601960
continue
19611961
}
19621962

conn_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -356,13 +356,13 @@ func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
356356
host := q.Host.ConnectAddress().String()
357357
o.metrics[host] = q.Metrics
358358
o.logger.Debug("Observed query.",
359-
newLogFieldString("stmt", q.Statement),
360-
newLogFieldInt("rows", q.Rows),
361-
newLogFieldString("duration", q.End.Sub(q.Start).String()),
362-
newLogFieldString("host", host),
363-
newLogFieldInt("attempts", q.Metrics.Attempts),
364-
newLogFieldString("latency", strconv.FormatInt(q.Metrics.TotalLatency, 10)),
365-
newLogFieldError("err", q.Err))
359+
NewLogFieldString("stmt", q.Statement),
360+
NewLogFieldInt("rows", q.Rows),
361+
NewLogFieldString("duration", q.End.Sub(q.Start).String()),
362+
NewLogFieldString("host", host),
363+
NewLogFieldInt("attempts", q.Metrics.Attempts),
364+
NewLogFieldString("latency", strconv.FormatInt(q.Metrics.TotalLatency, 10)),
365+
NewLogFieldError("err", q.Err))
366366
}
367367

368368
func (o *testQueryObserver) GetMetrics(host *HostInfo) *hostMetrics {

0 commit comments

Comments
 (0)